Merge remote-tracking branch 'prometheus/main' into feat/promote-attributes

This commit is contained in:
Arve Knudsen 2024-07-18 10:21:43 +02:00
commit ad0a30cdd8
6 changed files with 392 additions and 100 deletions

View file

@ -137,6 +137,18 @@ will be used.
Expired block cleanup happens in the background. It may take up to two hours Expired block cleanup happens in the background. It may take up to two hours
to remove expired blocks. Blocks must be fully expired before they are removed. to remove expired blocks. Blocks must be fully expired before they are removed.
## Right-Sizing Retention Size
If you are utilizing `storage.tsdb.retention.size` to set a size limit, you
will want to consider the right size for this value relative to the storage you
have allocated for Prometheus. It is wise to reduce the retention size to provide
a buffer, ensuring that older entries will be removed before the allocated storage
for Prometheus becomes full.
At present, we recommend setting the retention size to, at most, 80-85% of your
allocated Prometheus disk space. This increases the likelihood that older entires
will be removed prior to hitting any disk limitations.
## Remote storage integrations ## Remote storage integrations
Prometheus's local storage is limited to a single node's scalability and durability. Prometheus's local storage is limited to a single node's scalability and durability.

View file

@ -73,22 +73,32 @@ eval instant at 50m histogram_count(testhistogram3)
{start="positive"} 110 {start="positive"} 110
{start="negative"} 20 {start="negative"} 20
# Classic way of accessing the count still works.
eval instant at 50m testhistogram3_count
testhistogram3_count{start="positive"} 110
testhistogram3_count{start="negative"} 20
# Test histogram_sum. # Test histogram_sum.
eval instant at 50m histogram_sum(testhistogram3) eval instant at 50m histogram_sum(testhistogram3)
{start="positive"} 330 {start="positive"} 330
{start="negative"} 80 {start="negative"} 80
# Test histogram_avg. # Classic way of accessing the sum still works.
eval instant at 50m testhistogram3_sum
testhistogram3_sum{start="positive"} 330
testhistogram3_sum{start="negative"} 80
# Test histogram_avg. This has no classic equivalent.
eval instant at 50m histogram_avg(testhistogram3) eval instant at 50m histogram_avg(testhistogram3)
{start="positive"} 3 {start="positive"} 3
{start="negative"} 4 {start="negative"} 4
# Test histogram_stddev. # Test histogram_stddev. This has no classic equivalent.
eval instant at 50m histogram_stddev(testhistogram3) eval instant at 50m histogram_stddev(testhistogram3)
{start="positive"} 2.8189265757336734 {start="positive"} 2.8189265757336734
{start="negative"} 4.182715937754936 {start="negative"} 4.182715937754936
# Test histogram_stdvar. # Test histogram_stdvar. This has no classic equivalent.
eval instant at 50m histogram_stdvar(testhistogram3) eval instant at 50m histogram_stdvar(testhistogram3)
{start="positive"} 7.946347039377573 {start="positive"} 7.946347039377573
{start="negative"} 17.495112615949154 {start="negative"} 17.495112615949154
@ -103,137 +113,282 @@ eval instant at 50m histogram_fraction(0, 0.2, rate(testhistogram3[5m]))
{start="positive"} 0.6363636363636364 {start="positive"} 0.6363636363636364
{start="negative"} 0 {start="negative"} 0
# Test histogram_quantile. # In the classic histogram, we can access the corresponding bucket (if
# it exists) and divide by the count to get the same result.
eval instant at 50m testhistogram3_bucket{le=".2"} / ignoring(le) testhistogram3_count
{start="positive"} 0.6363636363636364
eval instant at 50m rate(testhistogram3_bucket{le=".2"}[5m]) / ignoring(le) rate(testhistogram3_count[5m])
{start="positive"} 0.6363636363636364
# Test histogram_quantile, native and classic.
eval instant at 50m histogram_quantile(0, testhistogram3)
{start="positive"} 0
{start="negative"} -0.25
eval instant at 50m histogram_quantile(0, testhistogram3_bucket) eval instant at 50m histogram_quantile(0, testhistogram3_bucket)
{start="positive"} 0 {start="positive"} 0
{start="negative"} -0.25 {start="negative"} -0.25
eval instant at 50m histogram_quantile(0.25, testhistogram3)
{start="positive"} 0.055
{start="negative"} -0.225
eval instant at 50m histogram_quantile(0.25, testhistogram3_bucket) eval instant at 50m histogram_quantile(0.25, testhistogram3_bucket)
{start="positive"} 0.055 {start="positive"} 0.055
{start="negative"} -0.225 {start="negative"} -0.225
eval instant at 50m histogram_quantile(0.5, testhistogram3)
{start="positive"} 0.125
{start="negative"} -0.2
eval instant at 50m histogram_quantile(0.5, testhistogram3_bucket) eval instant at 50m histogram_quantile(0.5, testhistogram3_bucket)
{start="positive"} 0.125 {start="positive"} 0.125
{start="negative"} -0.2 {start="negative"} -0.2
eval instant at 50m histogram_quantile(0.75, testhistogram3)
{start="positive"} 0.45
{start="negative"} -0.15
eval instant at 50m histogram_quantile(0.75, testhistogram3_bucket) eval instant at 50m histogram_quantile(0.75, testhistogram3_bucket)
{start="positive"} 0.45 {start="positive"} 0.45
{start="negative"} -0.15 {start="negative"} -0.15
eval instant at 50m histogram_quantile(1, testhistogram3)
{start="positive"} 1
{start="negative"} -0.1
eval instant at 50m histogram_quantile(1, testhistogram3_bucket) eval instant at 50m histogram_quantile(1, testhistogram3_bucket)
{start="positive"} 1 {start="positive"} 1
{start="negative"} -0.1 {start="negative"} -0.1
# Quantile too low. # Quantile too low.
eval_warn instant at 50m histogram_quantile(-0.1, testhistogram)
{start="positive"} -Inf
{start="negative"} -Inf
eval_warn instant at 50m histogram_quantile(-0.1, testhistogram_bucket) eval_warn instant at 50m histogram_quantile(-0.1, testhistogram_bucket)
{start="positive"} -Inf {start="positive"} -Inf
{start="negative"} -Inf {start="negative"} -Inf
# Quantile too high. # Quantile too high.
eval_warn instant at 50m histogram_quantile(1.01, testhistogram)
{start="positive"} +Inf
{start="negative"} +Inf
eval_warn instant at 50m histogram_quantile(1.01, testhistogram_bucket) eval_warn instant at 50m histogram_quantile(1.01, testhistogram_bucket)
{start="positive"} +Inf {start="positive"} +Inf
{start="negative"} +Inf {start="negative"} +Inf
# Quantile invalid. # Quantile invalid.
eval_warn instant at 50m histogram_quantile(NaN, testhistogram)
{start="positive"} NaN
{start="negative"} NaN
eval_warn instant at 50m histogram_quantile(NaN, testhistogram_bucket) eval_warn instant at 50m histogram_quantile(NaN, testhistogram_bucket)
{start="positive"} NaN {start="positive"} NaN
{start="negative"} NaN {start="negative"} NaN
# Quantile value in lowest bucket. # Quantile value in lowest bucket.
eval instant at 50m histogram_quantile(0, testhistogram)
{start="positive"} 0
{start="negative"} -0.2
eval instant at 50m histogram_quantile(0, testhistogram_bucket) eval instant at 50m histogram_quantile(0, testhistogram_bucket)
{start="positive"} 0 {start="positive"} 0
{start="negative"} -0.2 {start="negative"} -0.2
# Quantile value in highest bucket. # Quantile value in highest bucket.
eval instant at 50m histogram_quantile(1, testhistogram)
{start="positive"} 1
{start="negative"} 0.3
eval instant at 50m histogram_quantile(1, testhistogram_bucket) eval instant at 50m histogram_quantile(1, testhistogram_bucket)
{start="positive"} 1 {start="positive"} 1
{start="negative"} 0.3 {start="negative"} 0.3
# Finally some useful quantiles. # Finally some useful quantiles.
eval instant at 50m histogram_quantile(0.2, testhistogram)
{start="positive"} 0.048
{start="negative"} -0.2
eval instant at 50m histogram_quantile(0.2, testhistogram_bucket) eval instant at 50m histogram_quantile(0.2, testhistogram_bucket)
{start="positive"} 0.048 {start="positive"} 0.048
{start="negative"} -0.2 {start="negative"} -0.2
eval instant at 50m histogram_quantile(0.5, testhistogram)
{start="positive"} 0.15
{start="negative"} -0.15
eval instant at 50m histogram_quantile(0.5, testhistogram_bucket) eval instant at 50m histogram_quantile(0.5, testhistogram_bucket)
{start="positive"} 0.15 {start="positive"} 0.15
{start="negative"} -0.15 {start="negative"} -0.15
eval instant at 50m histogram_quantile(0.8, testhistogram)
{start="positive"} 0.72
{start="negative"} 0.3
eval instant at 50m histogram_quantile(0.8, testhistogram_bucket) eval instant at 50m histogram_quantile(0.8, testhistogram_bucket)
{start="positive"} 0.72 {start="positive"} 0.72
{start="negative"} 0.3 {start="negative"} 0.3
# More realistic with rates. # More realistic with rates.
eval instant at 50m histogram_quantile(0.2, rate(testhistogram[5m]))
{start="positive"} 0.048
{start="negative"} -0.2
eval instant at 50m histogram_quantile(0.2, rate(testhistogram_bucket[5m])) eval instant at 50m histogram_quantile(0.2, rate(testhistogram_bucket[5m]))
{start="positive"} 0.048 {start="positive"} 0.048
{start="negative"} -0.2 {start="negative"} -0.2
eval instant at 50m histogram_quantile(0.5, rate(testhistogram[5m]))
{start="positive"} 0.15
{start="negative"} -0.15
eval instant at 50m histogram_quantile(0.5, rate(testhistogram_bucket[5m])) eval instant at 50m histogram_quantile(0.5, rate(testhistogram_bucket[5m]))
{start="positive"} 0.15 {start="positive"} 0.15
{start="negative"} -0.15 {start="negative"} -0.15
eval instant at 50m histogram_quantile(0.8, rate(testhistogram[5m]))
{start="positive"} 0.72
{start="negative"} 0.3
eval instant at 50m histogram_quantile(0.8, rate(testhistogram_bucket[5m])) eval instant at 50m histogram_quantile(0.8, rate(testhistogram_bucket[5m]))
{start="positive"} 0.72 {start="positive"} 0.72
{start="negative"} 0.3 {start="negative"} 0.3
# Want results exactly in the middle of the bucket. # Want results exactly in the middle of the bucket.
eval instant at 7m histogram_quantile(1./6., testhistogram2)
{} 1
eval instant at 7m histogram_quantile(1./6., testhistogram2_bucket) eval instant at 7m histogram_quantile(1./6., testhistogram2_bucket)
{} 1 {} 1
eval instant at 7m histogram_quantile(0.5, testhistogram2)
{} 3
eval instant at 7m histogram_quantile(0.5, testhistogram2_bucket) eval instant at 7m histogram_quantile(0.5, testhistogram2_bucket)
{} 3 {} 3
eval instant at 7m histogram_quantile(5./6., testhistogram2)
{} 5
eval instant at 7m histogram_quantile(5./6., testhistogram2_bucket) eval instant at 7m histogram_quantile(5./6., testhistogram2_bucket)
{} 5 {} 5
eval instant at 47m histogram_quantile(1./6., rate(testhistogram2[15m]))
{} 1
eval instant at 47m histogram_quantile(1./6., rate(testhistogram2_bucket[15m])) eval instant at 47m histogram_quantile(1./6., rate(testhistogram2_bucket[15m]))
{} 1 {} 1
eval instant at 47m histogram_quantile(0.5, rate(testhistogram2[15m]))
{} 3
eval instant at 47m histogram_quantile(0.5, rate(testhistogram2_bucket[15m])) eval instant at 47m histogram_quantile(0.5, rate(testhistogram2_bucket[15m]))
{} 3 {} 3
eval instant at 47m histogram_quantile(5./6., rate(testhistogram2[15m]))
{} 5
eval instant at 47m histogram_quantile(5./6., rate(testhistogram2_bucket[15m])) eval instant at 47m histogram_quantile(5./6., rate(testhistogram2_bucket[15m]))
{} 5 {} 5
# Aggregated histogram: Everything in one. # Aggregated histogram: Everything in one. Note how native histograms
# don't require aggregation by le.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])))
{} 0.075
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le)) eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le))
{} 0.075 {} 0.075
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])))
{} 0.1277777777777778
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le)) eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le))
{} 0.1277777777777778 {} 0.1277777777777778
# Aggregated histogram: Everything in one. Now with avg, which does not change anything. # Aggregated histogram: Everything in one. Now with avg, which does not change anything.
eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds[5m])))
{} 0.075
eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds_bucket[5m])) by (le)) eval instant at 50m histogram_quantile(0.3, avg(rate(request_duration_seconds_bucket[5m])) by (le))
{} 0.075 {} 0.075
eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds[5m])))
{} 0.12777777777777778
eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds_bucket[5m])) by (le)) eval instant at 50m histogram_quantile(0.5, avg(rate(request_duration_seconds_bucket[5m])) by (le))
{} 0.12777777777777778 {} 0.12777777777777778
# Aggregated histogram: By instance. # Aggregated histogram: By instance.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (instance))
{instance="ins1"} 0.075
{instance="ins2"} 0.075
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance)) eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance))
{instance="ins1"} 0.075 {instance="ins1"} 0.075
{instance="ins2"} 0.075 {instance="ins2"} 0.075
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (instance))
{instance="ins1"} 0.1333333333
{instance="ins2"} 0.125
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance)) eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, instance))
{instance="ins1"} 0.1333333333 {instance="ins1"} 0.1333333333
{instance="ins2"} 0.125 {instance="ins2"} 0.125
# Aggregated histogram: By job. # Aggregated histogram: By job.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (job))
{job="job1"} 0.1
{job="job2"} 0.0642857142857143
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job)) eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job))
{job="job1"} 0.1 {job="job1"} 0.1
{job="job2"} 0.0642857142857143 {job="job2"} 0.0642857142857143
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (job))
{job="job1"} 0.14
{job="job2"} 0.1125
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job)) eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job))
{job="job1"} 0.14 {job="job1"} 0.14
{job="job2"} 0.1125 {job="job2"} 0.1125
# Aggregated histogram: By job and instance. # Aggregated histogram: By job and instance.
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds[5m])) by (job, instance))
{instance="ins1", job="job1"} 0.11
{instance="ins2", job="job1"} 0.09
{instance="ins1", job="job2"} 0.06
{instance="ins2", job="job2"} 0.0675
eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance)) eval instant at 50m histogram_quantile(0.3, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance))
{instance="ins1", job="job1"} 0.11 {instance="ins1", job="job1"} 0.11
{instance="ins2", job="job1"} 0.09 {instance="ins2", job="job1"} 0.09
{instance="ins1", job="job2"} 0.06 {instance="ins1", job="job2"} 0.06
{instance="ins2", job="job2"} 0.0675 {instance="ins2", job="job2"} 0.0675
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds[5m])) by (job, instance))
{instance="ins1", job="job1"} 0.15
{instance="ins2", job="job1"} 0.1333333333333333
{instance="ins1", job="job2"} 0.1
{instance="ins2", job="job2"} 0.1166666666666667
eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance)) eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bucket[5m])) by (le, job, instance))
{instance="ins1", job="job1"} 0.15 {instance="ins1", job="job1"} 0.15
{instance="ins2", job="job1"} 0.1333333333333333 {instance="ins2", job="job1"} 0.1333333333333333
@ -241,18 +396,32 @@ eval instant at 50m histogram_quantile(0.5, sum(rate(request_duration_seconds_bu
{instance="ins2", job="job2"} 0.1166666666666667 {instance="ins2", job="job2"} 0.1166666666666667
# The unaggregated histogram for comparison. Same result as the previous one. # The unaggregated histogram for comparison. Same result as the previous one.
eval instant at 50m histogram_quantile(0.3, rate(request_duration_seconds[5m]))
{instance="ins1", job="job1"} 0.11
{instance="ins2", job="job1"} 0.09
{instance="ins1", job="job2"} 0.06
{instance="ins2", job="job2"} 0.0675
eval instant at 50m histogram_quantile(0.3, rate(request_duration_seconds_bucket[5m])) eval instant at 50m histogram_quantile(0.3, rate(request_duration_seconds_bucket[5m]))
{instance="ins1", job="job1"} 0.11 {instance="ins1", job="job1"} 0.11
{instance="ins2", job="job1"} 0.09 {instance="ins2", job="job1"} 0.09
{instance="ins1", job="job2"} 0.06 {instance="ins1", job="job2"} 0.06
{instance="ins2", job="job2"} 0.0675 {instance="ins2", job="job2"} 0.0675
eval instant at 50m histogram_quantile(0.5, rate(request_duration_seconds[5m]))
{instance="ins1", job="job1"} 0.15
{instance="ins2", job="job1"} 0.13333333333333333
{instance="ins1", job="job2"} 0.1
{instance="ins2", job="job2"} 0.11666666666666667
eval instant at 50m histogram_quantile(0.5, rate(request_duration_seconds_bucket[5m])) eval instant at 50m histogram_quantile(0.5, rate(request_duration_seconds_bucket[5m]))
{instance="ins1", job="job1"} 0.15 {instance="ins1", job="job1"} 0.15
{instance="ins2", job="job1"} 0.13333333333333333 {instance="ins2", job="job1"} 0.13333333333333333
{instance="ins1", job="job2"} 0.1 {instance="ins1", job="job2"} 0.1
{instance="ins2", job="job2"} 0.11666666666666667 {instance="ins2", job="job2"} 0.11666666666666667
# All NHCBs summed into one.
eval instant at 50m sum(request_duration_seconds) eval instant at 50m sum(request_duration_seconds)
{} {{schema:-53 count:250 custom_values:[0.1 0.2] buckets:[100 90 60]}} {} {{schema:-53 count:250 custom_values:[0.1 0.2] buckets:[100 90 60]}}
@ -303,11 +472,13 @@ load_with_nhcb 5m
eval instant at 50m histogram_quantile(0.2, rate(empty_bucket[5m])) eval instant at 50m histogram_quantile(0.2, rate(empty_bucket[5m]))
{instance="ins1", job="job1"} NaN {instance="ins1", job="job1"} NaN
# Load a duplicate histogram with a different name to test failure scenario on multiple histograms with the same label set # Load a duplicate histogram with a different name to test failure scenario on multiple histograms with the same label set.
# https://github.com/prometheus/prometheus/issues/9910 # https://github.com/prometheus/prometheus/issues/9910
load_with_nhcb 5m load_with_nhcb 5m
request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.1"} 0+1x10 request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.1"} 0+1x10
request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.2"} 0+3x10 request_duration_seconds2_bucket{job="job1", instance="ins1", le="0.2"} 0+3x10
request_duration_seconds2_bucket{job="job1", instance="ins1", le="+Inf"} 0+4x10 request_duration_seconds2_bucket{job="job1", instance="ins1", le="+Inf"} 0+4x10
eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket$"}) eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*_bucket"})
eval_fail instant at 50m histogram_quantile(0.99, {__name__=~"request_duration_seconds\\d*"})

View file

@ -848,10 +848,11 @@ func (a *headAppender) Commit() (err error) {
inOrderMint int64 = math.MaxInt64 inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64 inOrderMaxt int64 = math.MinInt64
ooomint int64 = math.MaxInt64 oooMinT int64 = math.MaxInt64
ooomaxt int64 = math.MinInt64 oooMaxT int64 = math.MinInt64
wblSamples []record.RefSample wblSamples []record.RefSample
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [][]byte oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load() oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
series *memSeries series *memSeries
@ -872,6 +873,7 @@ func (a *headAppender) Commit() (err error) {
// WBL is not enabled. So no need to collect. // WBL is not enabled. So no need to collect.
wblSamples = nil wblSamples = nil
oooMmapMarkers = nil oooMmapMarkers = nil
oooMmapMarkersCount = 0
return return
} }
// The m-map happens before adding a new sample. So we collect // The m-map happens before adding a new sample. So we collect
@ -880,13 +882,15 @@ func (a *headAppender) Commit() (err error) {
// WBL Before this Commit(): [old samples before this commit for chunk 1] // WBL Before this Commit(): [old samples before this commit for chunk 1]
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3] // WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
if oooMmapMarkers != nil { if oooMmapMarkers != nil {
markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers)) markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount)
for ref, mmapRef := range oooMmapMarkers { for ref, mmapRefs := range oooMmapMarkers {
for _, mmapRef := range mmapRefs {
markers = append(markers, record.RefMmapMarker{ markers = append(markers, record.RefMmapMarker{
Ref: ref, Ref: ref,
MmapRef: mmapRef, MmapRef: mmapRef,
}) })
} }
}
r := enc.MmapMarkers(markers, a.head.getBytesBuffer()) r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r) oooRecords = append(oooRecords, r)
} }
@ -928,32 +932,39 @@ func (a *headAppender) Commit() (err error) {
case oooSample: case oooSample:
// Sample is OOO and OOO handling is enabled // Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance. // and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated { if chunkCreated {
r, ok := oooMmapMarkers[series.ref] r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 { if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples // !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker. // before setting this m-map marker.
// r != 0 means we have already m-mapped a chunk for this series in the same Commit(). // r != nil means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers // Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records. // seen till now to the WBL records.
collectOOORecords() collectOOORecords()
} }
if oooMmapMarkers == nil { if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef) oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
} }
oooMmapMarkers[series.ref] = mmapRef
} }
if ok { if ok {
wblSamples = append(wblSamples, s) wblSamples = append(wblSamples, s)
if s.T < ooomint { if s.T < oooMinT {
ooomint = s.T oooMinT = s.T
} }
if s.T > ooomaxt { if s.T > oooMaxT {
ooomaxt = s.T oooMaxT = s.T
} }
floatOOOAccepted++ floatOOOAccepted++
} else { } else {
@ -1053,7 +1064,7 @@ func (a *headAppender) Commit() (err error) {
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted)) a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt) a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)
collectOOORecords() collectOOORecords()
if a.head.wbl != nil { if a.head.wbl != nil {
@ -1069,14 +1080,14 @@ func (a *headAppender) Commit() (err error) {
} }
// insert is like append, except it inserts. Used for OOO samples. // insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil { if s.ooo == nil {
s.ooo = &memSeriesOOOFields{} s.ooo = &memSeriesOOOFields{}
} }
c := s.ooo.oooHeadChunk c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) { if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
chunkCreated = true chunkCreated = true
} }
@ -1089,7 +1100,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
c.maxTime = t c.maxTime = t
} }
} }
return ok, chunkCreated, mmapRef return ok, chunkCreated, mmapRefs
} }
// chunkOpts are chunk-level options that are passed when appending to a memSeries. // chunkOpts are chunk-level options that are passed when appending to a memSeries.
@ -1431,7 +1442,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil. // The caller must ensure that s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) { func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
s.ooo.oooHeadChunk = &oooHeadChunk{ s.ooo.oooHeadChunk = &oooHeadChunk{
@ -1443,21 +1454,29 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
return s.ooo.oooHeadChunk, ref return s.ooo.oooHeadChunk, ref
} }
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef { func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef {
if s.ooo == nil || s.ooo.oooHeadChunk == nil { if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// There is no head chunk, so nothing to m-map here. // OOO is not enabled or there is no head chunk, so nothing to m-map here.
return 0 return nil
} }
xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError) if err != nil {
handleChunkWriteError(err)
return nil
}
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1)
for _, memchunk := range chks {
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
chunkRefs = append(chunkRefs, chunkRef)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef, ref: chunkRef,
numSamples: uint16(xor.NumSamples()), numSamples: uint16(memchunk.chunk.NumSamples()),
minTime: s.ooo.oooHeadChunk.minTime, minTime: memchunk.minTime,
maxTime: s.ooo.oooHeadChunk.maxTime, maxTime: memchunk.maxTime,
}) })
}
s.ooo.oooHeadChunk = nil s.ooo.oooHeadChunk = nil
return chunkRef return chunkRefs
} }
// mmapChunks will m-map all but first chunk on s.headChunks list. // mmapChunks will m-map all but first chunk on s.headChunks list.

View file

@ -4730,6 +4730,14 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
// TestWBLReplay checks the replay at a low level. // TestWBLReplay checks the replay at a low level.
func TestWBLReplay(t *testing.T) { func TestWBLReplay(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testWBLReplay(t, scenario)
})
}
}
func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir() dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err) require.NoError(t, err)
@ -4745,11 +4753,11 @@ func TestWBLReplay(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
var expOOOSamples []sample var expOOOSamples []chunks.Sample
l := labels.FromStrings("foo", "bar") l := labels.FromStrings("foo", "bar")
appendSample := func(mins int64, isOOO bool) { appendSample := func(mins int64, val float64, isOOO bool) {
app := h.Appender(context.Background()) app := h.Appender(context.Background())
ts, v := mins*time.Minute.Milliseconds(), float64(mins) ts, v := mins*time.Minute.Milliseconds(), val
_, err := app.Append(0, l, ts, v) _, err := app.Append(0, l, ts, v)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -4760,15 +4768,15 @@ func TestWBLReplay(t *testing.T) {
} }
// In-order sample. // In-order sample.
appendSample(60, false) appendSample(60, 60, false)
// Out of order samples. // Out of order samples.
appendSample(40, true) appendSample(40, 40, true)
appendSample(35, true) appendSample(35, 35, true)
appendSample(50, true) appendSample(50, 50, true)
appendSample(55, true) appendSample(55, 55, true)
appendSample(59, true) appendSample(59, 59, true)
appendSample(31, true) appendSample(31, 31, true)
// Check that Head's time ranges are set properly. // Check that Head's time ranges are set properly.
require.Equal(t, 60*time.Minute.Milliseconds(), h.MinTime()) require.Equal(t, 60*time.Minute.Milliseconds(), h.MinTime())
@ -4792,22 +4800,23 @@ func TestWBLReplay(t *testing.T) {
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR() chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, chks, 1)
it := xor.Iterator(nil) it := chks[0].chunk.Iterator(nil)
actOOOSamples := make([]sample, 0, len(expOOOSamples)) actOOOSamples, err := storage.ExpandSamples(it, nil)
for it.Next() == chunkenc.ValFloat { require.NoError(t, err)
ts, v := it.At()
actOOOSamples = append(actOOOSamples, sample{t: ts, f: v})
}
// OOO chunk will be sorted. Hence sort the expected samples. // OOO chunk will be sorted. Hence sort the expected samples.
sort.Slice(expOOOSamples, func(i, j int) bool { sort.Slice(expOOOSamples, func(i, j int) bool {
return expOOOSamples[i].t < expOOOSamples[j].t return expOOOSamples[i].T() < expOOOSamples[j].T()
}) })
require.Equal(t, expOOOSamples, actOOOSamples) // Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers
// from being factored in to the sample comparison
// TODO(fionaliao): understand counter reset behaviour, might want to modify this later
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true)
require.NoError(t, h.Close()) require.NoError(t, h.Close())
} }

View file

@ -17,9 +17,10 @@ import (
"fmt" "fmt"
"sort" "sort"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
) )
@ -74,24 +75,22 @@ func (o *OOOChunk) NumSamples() int {
return len(o.samples) return len(o.samples)
} }
func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error) { // ToEncodedChunks returns chunks with the samples in the OOOChunk.
x := chunkenc.NewXORChunk() //
app, err := x.Appender() //nolint:revive // unexported-return.
if err != nil { func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) {
return nil, err if len(o.samples) == 0 {
} return nil, nil
for _, s := range o.samples {
app.Append(s.t, s.f)
}
return x, nil
}
func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error) {
x := chunkenc.NewXORChunk()
app, err := x.Appender()
if err != nil {
return nil, err
} }
// The most common case is that there will be a single chunk, with the same type of samples in it - this is always true for float samples.
chks = make([]memChunk, 0, 1)
var (
cmint int64
cmaxt int64
chunk chunkenc.Chunk
app chunkenc.Appender
)
prevEncoding := chunkenc.EncNone // Yes we could call the chunk for this, but this is more efficient.
for _, s := range o.samples { for _, s := range o.samples {
if s.t < mint { if s.t < mint {
continue continue
@ -99,9 +98,77 @@ func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk,
if s.t > maxt { if s.t > maxt {
break break
} }
app.Append(s.t, s.f) encoding := chunkenc.EncXOR
if s.h != nil {
encoding = chunkenc.EncHistogram
} else if s.fh != nil {
encoding = chunkenc.EncFloatHistogram
} }
return x, nil
// prevApp is the appender for the previous sample.
prevApp := app
if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
cmint = s.t
switch encoding {
case chunkenc.EncXOR:
chunk = chunkenc.NewXORChunk()
case chunkenc.EncHistogram:
chunk = chunkenc.NewHistogramChunk()
case chunkenc.EncFloatHistogram:
chunk = chunkenc.NewFloatHistogramChunk()
default:
chunk = chunkenc.NewXORChunk()
}
app, err = chunk.Appender()
if err != nil {
return
}
}
switch encoding {
case chunkenc.EncXOR:
app.Append(s.t, s.f)
case chunkenc.EncHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
case chunkenc.EncFloatHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
}
cmaxt = s.t
prevEncoding = encoding
}
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
return chks, nil
} }
var _ BlockReader = &OOORangeHead{} var _ BlockReader = &OOORangeHead{}

View file

@ -108,11 +108,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
c := s.ooo.oooHeadChunk c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
var xor chunkenc.Chunk
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail. chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
if err != nil {
handleChunkWriteError(err)
return nil
}
for _, chk := range chks {
addChunk(c.minTime, c.maxTime, ref, chk.chunk)
}
} else {
var emptyChunk chunkenc.Chunk
addChunk(c.minTime, c.maxTime, ref, emptyChunk)
} }
addChunk(c.minTime, c.maxTime, ref, xor)
} }
} }
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
@ -341,14 +349,20 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
continue continue
} }
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) var lastMmapRef chunks.ChunkDiskMapperRef
if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 { mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
} }
seq, off := mmapRef.Unpack() if len(mmapRefs) == 0 {
lastMmapRef = 0
} else {
lastMmapRef = mmapRefs[len(mmapRefs)-1]
}
seq, off := lastMmapRef.Unpack()
if seq > lastSeq || (seq == lastSeq && off > lastOff) { if seq > lastSeq || (seq == lastSeq && off > lastOff) {
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off
} }
if len(ms.ooo.oooMmappedChunks) > 0 { if len(ms.ooo.oooMmappedChunks) > 0 {
ch.postings = append(ch.postings, seriesRef) ch.postings = append(ch.postings, seriesRef)