mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge branch 'main' into cedwards/ooo-native-histograms
# Conflicts: # tsdb/head_append.go # tsdb/ooo_head_read.go
This commit is contained in:
commit
1f14f7e533
|
@ -20,6 +20,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -152,12 +153,18 @@ func TestTSDBDump(t *testing.T) {
|
||||||
expectedMetrics, err := os.ReadFile(tt.expectedDump)
|
expectedMetrics, err := os.ReadFile(tt.expectedDump)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expectedMetrics = normalizeNewLine(expectedMetrics)
|
expectedMetrics = normalizeNewLine(expectedMetrics)
|
||||||
// even though in case of one matcher samples are not sorted, the order in the cases above should stay the same.
|
// Sort both, because Prometheus does not guarantee the output order.
|
||||||
require.Equal(t, string(expectedMetrics), dumpedMetrics)
|
require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sortLines(buf string) string {
|
||||||
|
lines := strings.Split(buf, "\n")
|
||||||
|
slices.Sort(lines)
|
||||||
|
return strings.Join(lines, "\n")
|
||||||
|
}
|
||||||
|
|
||||||
func TestTSDBDumpOpenMetrics(t *testing.T) {
|
func TestTSDBDumpOpenMetrics(t *testing.T) {
|
||||||
storage := promqltest.LoadedStorage(t, `
|
storage := promqltest.LoadedStorage(t, `
|
||||||
load 1m
|
load 1m
|
||||||
|
@ -169,7 +176,7 @@ func TestTSDBDumpOpenMetrics(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expectedMetrics = normalizeNewLine(expectedMetrics)
|
expectedMetrics = normalizeNewLine(expectedMetrics)
|
||||||
dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
|
dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
|
||||||
require.Equal(t, string(expectedMetrics), dumpedMetrics)
|
require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {
|
func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {
|
||||||
|
|
|
@ -970,7 +970,7 @@ func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) {
|
||||||
}.Run(t)
|
}.Run(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestEndpointsUpdatePod makes sure that Endpoints discovery detects underlying Pods changes.
|
// TestEndpointsDiscoveryUpdatePod makes sure that Endpoints discovery detects underlying Pods changes.
|
||||||
// See https://github.com/prometheus/prometheus/issues/11305 for more details.
|
// See https://github.com/prometheus/prometheus/issues/11305 for more details.
|
||||||
func TestEndpointsDiscoveryUpdatePod(t *testing.T) {
|
func TestEndpointsDiscoveryUpdatePod(t *testing.T) {
|
||||||
pod := &v1.Pod{
|
pod := &v1.Pod{
|
||||||
|
|
|
@ -617,7 +617,7 @@ Like `sort`, `sort_desc` only affects the results of instant queries, as range q
|
||||||
|
|
||||||
## `sort_by_label()`
|
## `sort_by_label()`
|
||||||
|
|
||||||
**This function has to be enabled via the [feature flag](../feature_flags.md) `--enable-feature=promql-experimental-functions`.**
|
**This function has to be enabled via the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions`.**
|
||||||
|
|
||||||
`sort_by_label(v instant-vector, label string, ...)` returns vector elements sorted by their label values and sample value in case of label values being equal, in ascending order.
|
`sort_by_label(v instant-vector, label string, ...)` returns vector elements sorted by their label values and sample value in case of label values being equal, in ascending order.
|
||||||
|
|
||||||
|
@ -627,7 +627,7 @@ This function uses [natural sort order](https://en.wikipedia.org/wiki/Natural_so
|
||||||
|
|
||||||
## `sort_by_label_desc()`
|
## `sort_by_label_desc()`
|
||||||
|
|
||||||
**This function has to be enabled via the [feature flag](../feature_flags.md) `--enable-feature=promql-experimental-functions`.**
|
**This function has to be enabled via the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions`.**
|
||||||
|
|
||||||
Same as `sort_by_label`, but sorts in descending order.
|
Same as `sort_by_label`, but sorts in descending order.
|
||||||
|
|
||||||
|
@ -676,7 +676,7 @@ over time and return an instant vector with per-series aggregation results:
|
||||||
* `last_over_time(range-vector)`: the most recent point value in the specified interval.
|
* `last_over_time(range-vector)`: the most recent point value in the specified interval.
|
||||||
* `present_over_time(range-vector)`: the value 1 for any series in the specified interval.
|
* `present_over_time(range-vector)`: the value 1 for any series in the specified interval.
|
||||||
|
|
||||||
If the [feature flag](../feature_flags.md)
|
If the [feature flag](../feature_flags.md#experimental-promql-functions)
|
||||||
`--enable-feature=promql-experimental-functions` is set, the following
|
`--enable-feature=promql-experimental-functions` is set, the following
|
||||||
additional functions are available:
|
additional functions are available:
|
||||||
|
|
||||||
|
|
|
@ -2786,11 +2786,12 @@ type groupedAggregation struct {
|
||||||
heap vectorByValueHeap
|
heap vectorByValueHeap
|
||||||
|
|
||||||
// All bools together for better packing within the struct.
|
// All bools together for better packing within the struct.
|
||||||
seen bool // Was this output groups seen in the input at this timestamp.
|
seen bool // Was this output groups seen in the input at this timestamp.
|
||||||
hasFloat bool // Has at least 1 float64 sample aggregated.
|
hasFloat bool // Has at least 1 float64 sample aggregated.
|
||||||
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
hasHistogram bool // Has at least 1 histogram sample aggregated.
|
||||||
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
|
incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets, or incompatible custom buckets.
|
||||||
incrementalMean bool // True after reverting to incremental calculation of the mean value.
|
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
|
||||||
|
incrementalMean bool // True after reverting to incremental calculation of the mean value.
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
|
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
|
||||||
|
@ -2814,10 +2815,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
// Initialize this group if it's the first time we've seen it.
|
// Initialize this group if it's the first time we've seen it.
|
||||||
if !group.seen {
|
if !group.seen {
|
||||||
*group = groupedAggregation{
|
*group = groupedAggregation{
|
||||||
seen: true,
|
seen: true,
|
||||||
floatValue: f,
|
floatValue: f,
|
||||||
floatMean: f,
|
floatMean: f,
|
||||||
groupCount: 1,
|
incompatibleHistograms: false,
|
||||||
|
groupCount: 1,
|
||||||
}
|
}
|
||||||
switch op {
|
switch op {
|
||||||
case parser.AVG, parser.SUM:
|
case parser.AVG, parser.SUM:
|
||||||
|
@ -2838,6 +2840,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if group.incompatibleHistograms {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
switch op {
|
switch op {
|
||||||
case parser.SUM:
|
case parser.SUM:
|
||||||
if h != nil {
|
if h != nil {
|
||||||
|
@ -2846,6 +2852,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
_, err := group.histogramValue.Add(h)
|
_, err := group.histogramValue.Add(h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
||||||
|
group.incompatibleHistograms = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Otherwise the aggregation contained floats
|
// Otherwise the aggregation contained floats
|
||||||
|
@ -2866,10 +2873,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
toAdd, err := left.Sub(right)
|
toAdd, err := left.Sub(right)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
||||||
|
group.incompatibleHistograms = true
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
_, err = group.histogramValue.Add(toAdd)
|
_, err = group.histogramValue.Add(toAdd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos)
|
||||||
|
group.incompatibleHistograms = true
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Otherwise the aggregation contained floats
|
// Otherwise the aggregation contained floats
|
||||||
|
@ -2966,6 +2977,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
|
case aggr.incompatibleHistograms:
|
||||||
|
continue
|
||||||
case aggr.hasHistogram:
|
case aggr.hasHistogram:
|
||||||
aggr.histogramValue = aggr.histogramValue.Compact(0)
|
aggr.histogramValue = aggr.histogramValue.Compact(0)
|
||||||
case aggr.incrementalMean:
|
case aggr.incrementalMean:
|
||||||
|
@ -2992,9 +3005,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
|
||||||
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
|
annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if aggr.hasHistogram {
|
switch {
|
||||||
|
case aggr.incompatibleHistograms:
|
||||||
|
continue
|
||||||
|
case aggr.hasHistogram:
|
||||||
aggr.histogramValue.Compact(0)
|
aggr.histogramValue.Compact(0)
|
||||||
} else {
|
default:
|
||||||
aggr.floatValue += aggr.floatKahanC
|
aggr.floatValue += aggr.floatKahanC
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -3161,7 +3177,7 @@ seriesLoop:
|
||||||
return mat, annos
|
return mat, annos
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregationK evaluates count_values on vec.
|
// aggregationCountValues evaluates count_values on vec.
|
||||||
// Outputs as many series per group as there are values in the input.
|
// Outputs as many series per group as there are values in the input.
|
||||||
func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
|
func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
|
||||||
type groupCount struct {
|
type groupCount struct {
|
||||||
|
|
|
@ -785,6 +785,8 @@ eval_warn instant at 1m rate(some_metric[30s])
|
||||||
eval_warn instant at 30s rate(some_metric[30s])
|
eval_warn instant at 30s rate(some_metric[30s])
|
||||||
# Should produce no results.
|
# Should produce no results.
|
||||||
|
|
||||||
|
clear
|
||||||
|
|
||||||
# Histogram with constant buckets.
|
# Histogram with constant buckets.
|
||||||
load 1m
|
load 1m
|
||||||
const_histogram {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}}
|
const_histogram {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}}
|
||||||
|
@ -828,3 +830,59 @@ eval instant at 5m histogram_stddev(rate(const_histogram[5m]))
|
||||||
# Zero buckets mean no observations, so there is no standard variance.
|
# Zero buckets mean no observations, so there is no standard variance.
|
||||||
eval instant at 5m histogram_stdvar(rate(const_histogram[5m]))
|
eval instant at 5m histogram_stdvar(rate(const_histogram[5m]))
|
||||||
{} NaN
|
{} NaN
|
||||||
|
|
||||||
|
clear
|
||||||
|
|
||||||
|
# Test mixing exponential and custom buckets.
|
||||||
|
load 6m
|
||||||
|
metric{series="exponential"} {{sum:4 count:3 buckets:[1 2 1]}} _ {{sum:4 count:3 buckets:[1 2 1]}}
|
||||||
|
metric{series="other-exponential"} {{sum:3 count:2 buckets:[1 1 1]}} _ {{sum:3 count:2 buckets:[1 1 1]}}
|
||||||
|
metric{series="custom"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||||
|
metric{series="other-custom"} _ {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}}
|
||||||
|
|
||||||
|
# T=0: only exponential
|
||||||
|
# T=6: only custom
|
||||||
|
# T=12: mixed, should be ignored and emit a warning
|
||||||
|
eval_warn range from 0 to 12m step 6m sum(metric)
|
||||||
|
{} {{sum:7 count:5 buckets:[2 3 2]}} {{schema:-53 sum:16 count:3 custom_values:[5 10] buckets:[1 2]}} _
|
||||||
|
|
||||||
|
eval_warn range from 0 to 12m step 6m avg(metric)
|
||||||
|
{} {{sum:3.5 count:2.5 buckets:[1 1.5 1]}} {{schema:-53 sum:8 count:1.5 custom_values:[5 10] buckets:[0.5 1]}} _
|
||||||
|
|
||||||
|
clear
|
||||||
|
|
||||||
|
# Test incompatible custom bucket schemas.
|
||||||
|
load 6m
|
||||||
|
metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||||
|
metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}}
|
||||||
|
metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||||
|
|
||||||
|
# T=0: incompatible, should be ignored and emit a warning
|
||||||
|
# T=6: compatible
|
||||||
|
# T=12: incompatible followed by compatible, should be ignored and emit a warning
|
||||||
|
eval_warn range from 0 to 12m step 6m sum(metric)
|
||||||
|
{} _ {{schema:-53 sum:2 count:2 custom_values:[5 10] buckets:[2]}} _
|
||||||
|
|
||||||
|
eval_warn range from 0 to 12m step 6m avg(metric)
|
||||||
|
{} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} _
|
||||||
|
|
||||||
|
clear
|
||||||
|
|
||||||
|
load 1m
|
||||||
|
metric{group="just-floats", series="1"} 2
|
||||||
|
metric{group="just-floats", series="2"} 3
|
||||||
|
metric{group="just-exponential-histograms", series="1"} {{sum:3 count:4 buckets:[1 2 1]}}
|
||||||
|
metric{group="just-exponential-histograms", series="2"} {{sum:2 count:3 buckets:[1 1 1]}}
|
||||||
|
metric{group="just-custom-histograms", series="1"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}}
|
||||||
|
metric{group="just-custom-histograms", series="2"} {{schema:-53 sum:3 count:4 custom_values:[2] buckets:[7]}}
|
||||||
|
metric{group="floats-and-histograms", series="1"} 2
|
||||||
|
metric{group="floats-and-histograms", series="2"} {{sum:2 count:3 buckets:[1 1 1]}}
|
||||||
|
metric{group="exponential-and-custom-histograms", series="1"} {{sum:2 count:3 buckets:[1 1 1]}}
|
||||||
|
metric{group="exponential-and-custom-histograms", series="2"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||||
|
metric{group="incompatible-custom-histograms", series="1"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||||
|
metric{group="incompatible-custom-histograms", series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}}
|
||||||
|
|
||||||
|
eval_warn instant at 0 sum by (group) (metric)
|
||||||
|
{group="just-floats"} 5
|
||||||
|
{group="just-exponential-histograms"} {{sum:5 count:7 buckets:[2 3 2]}}
|
||||||
|
{group="just-custom-histograms"} {{schema:-53 sum:4 count:5 custom_values:[2] buckets:[8]}}
|
||||||
|
|
|
@ -72,7 +72,7 @@ func TestListSeriesIterator(t *testing.T) {
|
||||||
require.Equal(t, chunkenc.ValNone, it.Seek(2))
|
require.Equal(t, chunkenc.ValNone, it.Seek(2))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSeriesSetToChunkSet test the property of SeriesSet that says
|
// TestChunkSeriesSetToSeriesSet test the property of SeriesSet that says
|
||||||
// returned series should be iterable even after Next is called.
|
// returned series should be iterable even after Next is called.
|
||||||
func TestChunkSeriesSetToSeriesSet(t *testing.T) {
|
func TestChunkSeriesSetToSeriesSet(t *testing.T) {
|
||||||
series := []struct {
|
series := []struct {
|
||||||
|
|
71
tsdb/db.go
71
tsdb/db.go
|
@ -2046,7 +2046,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
|
blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier.
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2058,10 +2058,12 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if maxt >= db.head.MinTime() {
|
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
|
||||||
|
var headQuerier storage.Querier
|
||||||
|
if maxt >= db.head.MinTime() || overlapsOOO {
|
||||||
rh := NewRangeHead(db.head, mint, maxt)
|
rh := NewRangeHead(db.head, mint, maxt)
|
||||||
var err error
|
var err error
|
||||||
inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
|
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
@ -2071,36 +2073,28 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||||
if shouldClose {
|
if shouldClose {
|
||||||
if err := inOrderHeadQuerier.Close(); err != nil {
|
if err := headQuerier.Close(); err != nil {
|
||||||
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
|
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
inOrderHeadQuerier = nil
|
headQuerier = nil
|
||||||
}
|
}
|
||||||
if getNew {
|
if getNew {
|
||||||
rh := NewRangeHead(db.head, newMint, maxt)
|
rh := NewRangeHead(db.head, newMint, maxt)
|
||||||
inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
|
headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
|
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if inOrderHeadQuerier != nil {
|
|
||||||
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
if overlapsOOO {
|
||||||
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
|
||||||
var err error
|
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
|
||||||
outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
|
headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier)
|
||||||
if err != nil {
|
}
|
||||||
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
|
||||||
rh.isoState.Close()
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
|
if headQuerier != nil {
|
||||||
}
|
blockQueriers = append(blockQueriers, headQuerier)
|
||||||
|
|
||||||
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
|
@ -2128,7 +2122,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers
|
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier.
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2140,9 +2134,11 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if maxt >= db.head.MinTime() {
|
overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime())
|
||||||
|
var headQuerier storage.ChunkQuerier
|
||||||
|
if maxt >= db.head.MinTime() || overlapsOOO {
|
||||||
rh := NewRangeHead(db.head, mint, maxt)
|
rh := NewRangeHead(db.head, mint, maxt)
|
||||||
inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
|
headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
|
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
|
@ -2152,35 +2148,28 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
|
||||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||||
if shouldClose {
|
if shouldClose {
|
||||||
if err := inOrderHeadQuerier.Close(); err != nil {
|
if err := headQuerier.Close(); err != nil {
|
||||||
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
|
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
inOrderHeadQuerier = nil
|
headQuerier = nil
|
||||||
}
|
}
|
||||||
if getNew {
|
if getNew {
|
||||||
rh := NewRangeHead(db.head, newMint, maxt)
|
rh := NewRangeHead(db.head, newMint, maxt)
|
||||||
inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
|
headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
|
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if inOrderHeadQuerier != nil {
|
|
||||||
blockQueriers = append(blockQueriers, inOrderHeadQuerier)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
|
if overlapsOOO {
|
||||||
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
|
// We need to fetch from in-order and out-of-order chunks: wrap the headQuerier.
|
||||||
outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
|
isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef)
|
||||||
if err != nil {
|
headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier)
|
||||||
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
|
}
|
||||||
rh.isoState.Close()
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
|
if headQuerier != nil {
|
||||||
}
|
blockQueriers = append(blockQueriers, headQuerier)
|
||||||
|
|
||||||
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/exemplar"
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
@ -1024,7 +1025,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
// Sample is OOO and OOO handling is enabled
|
// Sample is OOO and OOO handling is enabled
|
||||||
// and the delta is within the OOO tolerance.
|
// and the delta is within the OOO tolerance.
|
||||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||||
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
r, ok := oooMmapMarkers[series.ref]
|
r, ok := oooMmapMarkers[series.ref]
|
||||||
if !ok || r != nil {
|
if !ok || r != nil {
|
||||||
|
@ -1120,7 +1121,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
// Sample is OOO and OOO handling is enabled
|
// Sample is OOO and OOO handling is enabled
|
||||||
// and the delta is within the OOO tolerance.
|
// and the delta is within the OOO tolerance.
|
||||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||||
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
r, ok := oooMmapMarkers[series.ref]
|
r, ok := oooMmapMarkers[series.ref]
|
||||||
if !ok || r != nil {
|
if !ok || r != nil {
|
||||||
|
@ -1216,7 +1217,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
// Sample is OOO and OOO handling is enabled
|
// Sample is OOO and OOO handling is enabled
|
||||||
// and the delta is within the OOO tolerance.
|
// and the delta is within the OOO tolerance.
|
||||||
var mmapRefs []chunks.ChunkDiskMapperRef
|
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||||
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
r, ok := oooMmapMarkers[series.ref]
|
r, ok := oooMmapMarkers[series.ref]
|
||||||
if !ok || r != nil {
|
if !ok || r != nil {
|
||||||
|
@ -1314,14 +1315,14 @@ func (a *headAppender) Commit() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert is like append, except it inserts. Used for OOO samples.
|
// insert is like append, except it inserts. Used for OOO samples.
|
||||||
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
|
func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger log.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
|
||||||
if s.ooo == nil {
|
if s.ooo == nil {
|
||||||
s.ooo = &memSeriesOOOFields{}
|
s.ooo = &memSeriesOOOFields{}
|
||||||
}
|
}
|
||||||
c := s.ooo.oooHeadChunk
|
c := s.ooo.oooHeadChunk
|
||||||
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
|
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
|
||||||
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
|
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
|
||||||
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
|
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1675,9 +1676,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
|
||||||
}
|
}
|
||||||
|
|
||||||
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s is locked and s.ooo is not nil.
|
||||||
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
|
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
|
||||||
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
|
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger)
|
||||||
|
|
||||||
s.ooo.oooHeadChunk = &oooHeadChunk{
|
s.ooo.oooHeadChunk = &oooHeadChunk{
|
||||||
chunk: NewOOOChunk(),
|
chunk: NewOOOChunk(),
|
||||||
|
@ -1688,7 +1689,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
|
||||||
return s.ooo.oooHeadChunk, ref
|
return s.ooo.oooHeadChunk, ref
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef {
|
// s must be locked when calling.
|
||||||
|
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) []chunks.ChunkDiskMapperRef {
|
||||||
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
|
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
|
||||||
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
|
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
|
||||||
return nil
|
return nil
|
||||||
|
@ -1700,6 +1702,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
|
||||||
}
|
}
|
||||||
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks))
|
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks))
|
||||||
for _, memchunk := range chks {
|
for _, memchunk := range chks {
|
||||||
|
if len(s.ooo.oooMmappedChunks) >= (oooChunkIDMask - 1) {
|
||||||
|
level.Error(logger).Log("msg", "Too many OOO chunks, dropping data", "series", s.lset.String())
|
||||||
|
break
|
||||||
|
}
|
||||||
chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
|
chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
|
||||||
chunkRefs = append(chunkRefs, chunkRef)
|
chunkRefs = append(chunkRefs, chunkRef)
|
||||||
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
|
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
|
||||||
|
|
|
@ -199,13 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
*chks = (*chks)[:0]
|
*chks = (*chks)[:0]
|
||||||
|
*chks = appendSeriesChunks(s, h.mint, h.maxt, *chks)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
||||||
for i, c := range s.mmappedChunks {
|
for i, c := range s.mmappedChunks {
|
||||||
// Do not expose chunks that are outside of the specified range.
|
// Do not expose chunks that are outside of the specified range.
|
||||||
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
if !c.OverlapsClosedInterval(mint, maxt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
*chks = append(*chks, chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: c.minTime,
|
MinTime: c.minTime,
|
||||||
MaxTime: c.maxTime,
|
MaxTime: c.maxTime,
|
||||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
|
||||||
|
@ -223,8 +228,8 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
||||||
} else {
|
} else {
|
||||||
maxTime = chk.maxTime
|
maxTime = chk.maxTime
|
||||||
}
|
}
|
||||||
if chk.OverlapsClosedInterval(h.mint, h.maxt) {
|
if chk.OverlapsClosedInterval(mint, maxt) {
|
||||||
*chks = append(*chks, chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: chk.minTime,
|
MinTime: chk.minTime,
|
||||||
MaxTime: maxTime,
|
MaxTime: maxTime,
|
||||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
|
||||||
|
@ -233,8 +238,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return chks
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// headChunkID returns the HeadChunkID referred to by the given position.
|
// headChunkID returns the HeadChunkID referred to by the given position.
|
||||||
|
@ -244,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
|
||||||
return chunks.HeadChunkID(pos) + s.firstChunkID
|
return chunks.HeadChunkID(pos) + s.firstChunkID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const oooChunkIDMask = 1 << 23
|
||||||
|
|
||||||
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
|
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
|
||||||
|
// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest:
|
||||||
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
|
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
|
||||||
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
|
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
||||||
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
|
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
|
||||||
|
}
|
||||||
|
|
||||||
|
func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) {
|
||||||
|
sid, cid := chunks.HeadChunkRef(ref).Unpack()
|
||||||
|
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||||
|
@ -339,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu
|
||||||
return chk, nil, err
|
return chk, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChunkWithCopy returns the chunk for the reference number.
|
type ChunkReaderWithCopy interface {
|
||||||
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk.
|
ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
|
||||||
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) {
|
}
|
||||||
return h.chunk(meta, true)
|
|
||||||
|
// ChunkOrIterableWithCopy returns the chunk for the reference number.
|
||||||
|
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk.
|
||||||
|
func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||||
|
chk, maxTime, err := h.chunk(meta, true)
|
||||||
|
return chk, nil, maxTime, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// chunk returns the chunk for the reference number.
|
// chunk returns the chunk for the reference number.
|
||||||
|
@ -358,9 +375,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
return h.chunkFromSeries(s, cid, copyLastChunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call with s locked.
|
||||||
|
func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
|
||||||
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
|
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Unlock()
|
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -374,7 +396,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
||||||
|
|
||||||
// This means that the chunk is outside the specified range.
|
// This means that the chunk is outside the specified range.
|
||||||
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||||
s.Unlock()
|
|
||||||
return nil, 0, storage.ErrNotFound
|
return nil, 0, storage.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,7 +412,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.Unlock()
|
|
||||||
|
|
||||||
return &safeHeadChunk{
|
return &safeHeadChunk{
|
||||||
Chunk: chk,
|
Chunk: chk,
|
||||||
|
@ -461,14 +481,15 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
|
||||||
return elem, true, offset == 0, nil
|
return elem, true, offset == 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// oooMergedChunks return an iterable over one or more OOO chunks for the given
|
// mergedChunks return an iterable over one or more OOO chunks for the given
|
||||||
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
||||||
// returned iterable will be a merge of all the overlapping chunks, if any,
|
// returned iterable will be a merge of all the overlapping chunks, if any,
|
||||||
// amongst all the chunks in the OOOHead.
|
// amongst all the chunks in the OOOHead.
|
||||||
|
// If hr is non-nil then in-order chunks are included.
|
||||||
// This function is not thread safe unless the caller holds a lock.
|
// This function is not thread safe unless the caller holds a lock.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
|
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
|
||||||
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
|
_, cid, _ := unpackHeadChunkRef(meta.Ref)
|
||||||
|
|
||||||
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
||||||
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
|
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
|
||||||
|
@ -509,6 +530,16 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
|
||||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hr != nil { // Include in-order chunks.
|
||||||
|
metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil)
|
||||||
|
for _, m := range metas {
|
||||||
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
||||||
|
meta: m,
|
||||||
|
ref: 0, // This tells the loop below it's an in-order head chunk.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Next we want to sort all the collected chunks by min time so we can find
|
// Next we want to sort all the collected chunks by min time so we can find
|
||||||
// those that overlap and stop when we know the rest don't.
|
// those that overlap and stop when we know the rest don't.
|
||||||
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
|
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
|
||||||
|
@ -520,9 +551,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var iterable chunkenc.Iterable
|
var iterable chunkenc.Iterable
|
||||||
if c.meta.Chunk != nil {
|
switch {
|
||||||
|
case c.meta.Chunk != nil:
|
||||||
iterable = c.meta.Chunk
|
iterable = c.meta.Chunk
|
||||||
} else {
|
case c.ref == 0: // This is an in-order head chunk.
|
||||||
|
_, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack()
|
||||||
|
var err error
|
||||||
|
iterable, _, err = hr.chunkFromSeries(s, cid, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid head chunk: %w", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
chk, err := cdm.Chunk(c.ref)
|
chk, err := cdm.Chunk(c.ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var cerr *chunks.CorruptionErr
|
var cerr *chunks.CorruptionErr
|
||||||
|
|
|
@ -5868,7 +5868,7 @@ func TestCuttingNewHeadChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample
|
// TestHeadDetectsDuplicateSampleAtSizeLimit tests a regression where a duplicate sample
|
||||||
// is appended to the head, right when the head chunk is at the size limit.
|
// is appended to the head, right when the head chunk is at the size limit.
|
||||||
// The test adds all samples as duplicate, thus expecting that the result has
|
// The test adds all samples as duplicate, thus expecting that the result has
|
||||||
// exactly half of the samples.
|
// exactly half of the samples.
|
||||||
|
|
|
@ -1006,7 +1006,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||||
unknownRefs++
|
unknownRefs++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
h.metrics.chunksCreated.Inc()
|
h.metrics.chunksCreated.Inc()
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
|
@ -1033,9 +1033,9 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||||
var chunkCreated bool
|
var chunkCreated bool
|
||||||
var ok bool
|
var ok bool
|
||||||
if s.h != nil {
|
if s.h != nil {
|
||||||
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||||
} else {
|
} else {
|
||||||
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||||
}
|
}
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
h.metrics.chunksCreated.Inc()
|
h.metrics.chunksCreated.Inc()
|
||||||
|
|
|
@ -14,16 +14,10 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// OOOChunk maintains samples in time-ascending order.
|
// OOOChunk maintains samples in time-ascending order.
|
||||||
|
@ -171,75 +165,3 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
||||||
}
|
}
|
||||||
return chks, nil
|
return chks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ BlockReader = &OOORangeHead{}
|
|
||||||
|
|
||||||
// OOORangeHead allows querying Head out of order samples via BlockReader
|
|
||||||
// interface implementation.
|
|
||||||
type OOORangeHead struct {
|
|
||||||
head *Head
|
|
||||||
// mint and maxt are tracked because when a query is handled we only want
|
|
||||||
// the timerange of the query and having preexisting pointers to the first
|
|
||||||
// and last timestamp help with that.
|
|
||||||
mint, maxt int64
|
|
||||||
|
|
||||||
isoState *oooIsolationState
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead {
|
|
||||||
isoState := head.oooIso.TrackReadAfter(minRef)
|
|
||||||
|
|
||||||
return &OOORangeHead{
|
|
||||||
head: head,
|
|
||||||
mint: mint,
|
|
||||||
maxt: maxt,
|
|
||||||
isoState: isoState,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) Index() (IndexReader, error) {
|
|
||||||
return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) Chunks() (ChunkReader, error) {
|
|
||||||
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) {
|
|
||||||
// As stated in the design doc https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing
|
|
||||||
// Tombstones are not supported for out of order metrics.
|
|
||||||
return tombstones.NewMemTombstones(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var oooRangeHeadULID = ulid.MustParse("0000000000XXXX000RANGEHEAD")
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) Meta() BlockMeta {
|
|
||||||
return BlockMeta{
|
|
||||||
MinTime: oh.mint,
|
|
||||||
MaxTime: oh.maxt,
|
|
||||||
ULID: oooRangeHeadULID,
|
|
||||||
Stats: BlockStats{
|
|
||||||
NumSeries: oh.head.NumSeries(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size returns the size taken by the Head block.
|
|
||||||
func (oh *OOORangeHead) Size() int64 {
|
|
||||||
return oh.head.Size()
|
|
||||||
}
|
|
||||||
|
|
||||||
// String returns an human readable representation of the out of order range
|
|
||||||
// head. It's important to keep this function in order to avoid the struct dump
|
|
||||||
// when the head is stringified in errors or logs.
|
|
||||||
func (oh *OOORangeHead) String() string {
|
|
||||||
return fmt.Sprintf("ooo range head (mint: %d, maxt: %d)", oh.MinTime(), oh.MaxTime())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) MinTime() int64 {
|
|
||||||
return oh.mint
|
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOORangeHead) MaxTime() int64 {
|
|
||||||
return oh.maxt
|
|
||||||
}
|
|
||||||
|
|
|
@ -27,15 +27,10 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
|
"github.com/prometheus/prometheus/util/annotations"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be
|
type HeadAndOOOIndexReader struct {
|
||||||
// accessed.
|
|
||||||
// It also has a reference to headIndexReader so we can leverage on its
|
|
||||||
// IndexReader implementation for all the methods that remain the same. We
|
|
||||||
// decided to do this to avoid code duplication.
|
|
||||||
// The only methods that change are the ones about getting Series and Postings.
|
|
||||||
type OOOHeadIndexReader struct {
|
|
||||||
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
||||||
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
||||||
}
|
}
|
||||||
|
@ -49,17 +44,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
|
||||||
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
|
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader {
|
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
|
||||||
hr := &headIndexReader{
|
hr := &headIndexReader{
|
||||||
head: head,
|
head: head,
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
}
|
}
|
||||||
return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef}
|
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
|
||||||
}
|
|
||||||
|
|
||||||
func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
|
||||||
return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type MultiChunk struct {
|
type MultiChunk struct {
|
||||||
|
@ -114,7 +105,7 @@ func (c MultiChunk) Reset([]byte) {
|
||||||
//
|
//
|
||||||
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
|
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
|
||||||
// the oooHeadChunk will not be considered.
|
// the oooHeadChunk will not be considered.
|
||||||
func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error {
|
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||||
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
|
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||||
|
|
||||||
if s == nil {
|
if s == nil {
|
||||||
|
@ -131,10 +122,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
*chks = (*chks)[:0]
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
if s.ooo == nil {
|
if s.ooo != nil {
|
||||||
return nil
|
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
||||||
}
|
}
|
||||||
|
*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
|
||||||
|
// any chunk at or before this ref will not be considered. 0 disables this check.
|
||||||
|
//
|
||||||
|
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
|
||||||
|
// the oooHeadChunk will not be considered.
|
||||||
|
func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error {
|
||||||
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
||||||
|
|
||||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||||
|
@ -149,7 +149,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
// Collect all chunks that overlap the query range.
|
// Collect all chunks that overlap the query range.
|
||||||
if s.ooo.oooHeadChunk != nil {
|
if s.ooo.oooHeadChunk != nil {
|
||||||
c := s.ooo.oooHeadChunk
|
c := s.ooo.oooHeadChunk
|
||||||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
|
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
|
||||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
||||||
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
||||||
headChunks := MultiChunk{}
|
headChunks := MultiChunk{}
|
||||||
|
@ -170,12 +170,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
}
|
}
|
||||||
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
|
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
|
||||||
c := s.ooo.oooMmappedChunks[i]
|
c := s.ooo.oooMmappedChunks[i]
|
||||||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
|
if c.OverlapsClosedInterval(mint, maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
|
||||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
|
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
|
||||||
addChunk(c.minTime, c.maxTime, ref, nil)
|
addChunk(c.minTime, c.maxTime, ref, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if includeInOrder {
|
||||||
|
tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks)
|
||||||
|
}
|
||||||
|
|
||||||
// There is nothing to do if we did not collect any chunk.
|
// There is nothing to do if we did not collect any chunk.
|
||||||
if len(tmpChks) == 0 {
|
if len(tmpChks) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -212,11 +216,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValues needs to be overridden from the headIndexReader implementation due
|
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||||
// to the check that happens at the beginning where we make sure that the query
|
// so we can return labels within either in-order range or ooo range.
|
||||||
// interval overlaps with the head minooot and maxooot.
|
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() {
|
||||||
if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() {
|
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,41 +271,30 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
type HeadAndOOOChunkReader struct {
|
||||||
switch len(values) {
|
head *Head
|
||||||
case 0:
|
mint, maxt int64
|
||||||
return index.EmptyPostings(), nil
|
cr *headChunkReader // If nil, only read OOO chunks.
|
||||||
case 1:
|
maxMmapRef chunks.ChunkDiskMapperRef
|
||||||
return oh.head.postings.Get(name, values[0]), nil // TODO(ganesh) Also call GetOOOPostings
|
oooIsoState *oooIsolationState
|
||||||
default:
|
}
|
||||||
// TODO(ganesh) We want to only return postings for out of order series.
|
|
||||||
res := make([]index.Postings, 0, len(values))
|
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
|
||||||
for _, value := range values {
|
return &HeadAndOOOChunkReader{
|
||||||
res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings
|
head: head,
|
||||||
}
|
mint: mint,
|
||||||
return index.Merge(ctx, res...), nil
|
maxt: maxt,
|
||||||
|
cr: cr,
|
||||||
|
maxMmapRef: maxMmapRef,
|
||||||
|
oooIsoState: oooIsoState,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type OOOHeadChunkReader struct {
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||||
head *Head
|
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
mint, maxt int64
|
if !isOOO {
|
||||||
isoState *oooIsolationState
|
return cr.cr.ChunkOrIterable(meta)
|
||||||
maxMmapRef chunks.ChunkDiskMapperRef
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader {
|
|
||||||
return &OOOHeadChunkReader{
|
|
||||||
head: head,
|
|
||||||
mint: mint,
|
|
||||||
maxt: maxt,
|
|
||||||
isoState: isoState,
|
|
||||||
maxMmapRef: maxMmapRef,
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
|
||||||
sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack()
|
|
||||||
|
|
||||||
s := cr.head.series.getByID(sid)
|
s := cr.head.series.getByID(sid)
|
||||||
// This means that the series has been garbage collected.
|
// This means that the series has been garbage collected.
|
||||||
|
@ -311,34 +303,35 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
if s.ooo == nil {
|
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||||
// There is no OOO data for this series.
|
|
||||||
s.Unlock()
|
|
||||||
return nil, nil, storage.ErrNotFound
|
|
||||||
}
|
|
||||||
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef)
|
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This means that the query range did not overlap with the requested chunk.
|
return nil, mc, err
|
||||||
if len(mc.chunkIterables) == 0 {
|
|
||||||
return nil, nil, storage.ErrNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, mc, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr OOOHeadChunkReader) Close() error {
|
// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour
|
||||||
if cr.isoState != nil {
|
// is only implemented for the in-order head chunk.
|
||||||
cr.isoState.Close()
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||||
|
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
|
if !isOOO {
|
||||||
|
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||||
|
}
|
||||||
|
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||||
|
return chk, iter, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *HeadAndOOOChunkReader) Close() error {
|
||||||
|
if cr.cr != nil && cr.cr.isoState != nil {
|
||||||
|
cr.cr.isoState.Close()
|
||||||
|
}
|
||||||
|
if cr.oooIsoState != nil {
|
||||||
|
cr.oooIsoState.Close()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type OOOCompactionHead struct {
|
type OOOCompactionHead struct {
|
||||||
oooIR *OOOHeadIndexReader
|
head *Head
|
||||||
lastMmapRef chunks.ChunkDiskMapperRef
|
lastMmapRef chunks.ChunkDiskMapperRef
|
||||||
lastWBLFile int
|
lastWBLFile int
|
||||||
postings []storage.SeriesRef
|
postings []storage.SeriesRef
|
||||||
|
@ -355,6 +348,7 @@ type OOOCompactionHead struct {
|
||||||
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
|
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
|
||||||
func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) {
|
func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) {
|
||||||
ch := &OOOCompactionHead{
|
ch := &OOOCompactionHead{
|
||||||
|
head: head,
|
||||||
chunkRange: head.chunkRange.Load(),
|
chunkRange: head.chunkRange.Load(),
|
||||||
mint: math.MaxInt64,
|
mint: math.MaxInt64,
|
||||||
maxt: math.MinInt64,
|
maxt: math.MinInt64,
|
||||||
|
@ -368,15 +362,14 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
|
||||||
ch.lastWBLFile = lastWBLFile
|
ch.lastWBLFile = lastWBLFile
|
||||||
}
|
}
|
||||||
|
|
||||||
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0)
|
hr := headIndexReader{head: head, mint: ch.mint, maxt: ch.maxt}
|
||||||
n, v := index.AllPostingsKey()
|
n, v := index.AllPostingsKey()
|
||||||
|
// TODO: filter to series with OOO samples, before sorting.
|
||||||
// TODO: verify this gets only ooo samples.
|
p, err := hr.Postings(ctx, n, v)
|
||||||
p, err := ch.oooIR.Postings(ctx, n, v)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p = ch.oooIR.SortedPostings(p)
|
p = hr.SortedPostings(p)
|
||||||
|
|
||||||
var lastSeq, lastOff int
|
var lastSeq, lastOff int
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
@ -397,7 +390,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastMmapRef chunks.ChunkDiskMapperRef
|
var lastMmapRef chunks.ChunkDiskMapperRef
|
||||||
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
|
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger)
|
||||||
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
||||||
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
||||||
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
|
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
|
||||||
|
@ -433,7 +426,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
|
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) {
|
||||||
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil
|
return NewHeadAndOOOChunkReader(ch.head, ch.mint, ch.maxt, nil, nil, ch.lastMmapRef), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {
|
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) {
|
||||||
|
@ -459,12 +452,12 @@ func (ch *OOOCompactionHead) Meta() BlockMeta {
|
||||||
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
|
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
|
||||||
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead {
|
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead {
|
||||||
return &OOOCompactionHead{
|
return &OOOCompactionHead{
|
||||||
oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0),
|
head: ch.head,
|
||||||
lastMmapRef: ch.lastMmapRef,
|
lastMmapRef: ch.lastMmapRef,
|
||||||
postings: ch.postings,
|
postings: ch.postings,
|
||||||
chunkRange: ch.chunkRange,
|
chunkRange: ch.chunkRange,
|
||||||
mint: ch.mint,
|
mint: mint,
|
||||||
maxt: ch.maxt,
|
maxt: maxt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -484,7 +477,8 @@ func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter {
|
func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter {
|
||||||
return ir.ch.oooIR.Symbols()
|
hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
|
||||||
|
return hr.Symbols()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) {
|
func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) {
|
||||||
|
@ -505,11 +499,28 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||||
return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount)
|
hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt}
|
||||||
|
return hr.ShardedPostings(p, shardIndex, shardCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||||
return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef)
|
s := ir.ch.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||||
|
|
||||||
|
if s == nil {
|
||||||
|
ir.ch.head.metrics.seriesNotFound.Inc()
|
||||||
|
return storage.ErrNotFound
|
||||||
|
}
|
||||||
|
builder.Assign(s.labels())
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
|
if s.ooo == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
|
@ -537,5 +548,91 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *OOOCompactionHeadIndexReader) Close() error {
|
func (ir *OOOCompactionHeadIndexReader) Close() error {
|
||||||
return ir.ch.oooIR.Close()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeadAndOOOQuerier queries both the head and the out-of-order head.
|
||||||
|
type HeadAndOOOQuerier struct {
|
||||||
|
mint, maxt int64
|
||||||
|
head *Head
|
||||||
|
index IndexReader
|
||||||
|
chunkr ChunkReader
|
||||||
|
querier storage.Querier
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier {
|
||||||
|
cr := &headChunkReader{
|
||||||
|
head: head,
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
isoState: head.iso.State(mint, maxt),
|
||||||
|
}
|
||||||
|
return &HeadAndOOOQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
head: head,
|
||||||
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
|
||||||
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
|
||||||
|
querier: querier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) Close() error {
|
||||||
|
q.chunkr.Close()
|
||||||
|
return q.querier.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||||
|
return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeadAndOOOChunkQuerier queries both the head and the out-of-order head.
|
||||||
|
type HeadAndOOOChunkQuerier struct {
|
||||||
|
mint, maxt int64
|
||||||
|
head *Head
|
||||||
|
index IndexReader
|
||||||
|
chunkr ChunkReader
|
||||||
|
querier storage.ChunkQuerier
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier {
|
||||||
|
cr := &headChunkReader{
|
||||||
|
head: head,
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
isoState: head.iso.State(mint, maxt),
|
||||||
|
}
|
||||||
|
return &HeadAndOOOChunkQuerier{
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
head: head,
|
||||||
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef),
|
||||||
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0),
|
||||||
|
querier: querier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelValues(ctx, name, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return q.querier.LabelNames(ctx, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) Close() error {
|
||||||
|
q.chunkr.Close()
|
||||||
|
return q.querier.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||||
|
return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ import (
|
||||||
var (
|
var (
|
||||||
_ chunkenc.Chunk = &MultiChunk{}
|
_ chunkenc.Chunk = &MultiChunk{}
|
||||||
_ chunkenc.Iterable = &mergedOOOChunks{}
|
_ chunkenc.Iterable = &mergedOOOChunks{}
|
||||||
_ IndexReader = &OOOHeadIndexReader{}
|
_ IndexReader = &HeadAndOOOIndexReader{}
|
||||||
)
|
)
|
||||||
|
|
||||||
type chunkInterval struct {
|
type chunkInterval struct {
|
||||||
|
@ -323,7 +323,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||||
for ref, c := range intervals {
|
for ref, c := range intervals {
|
||||||
if c.ID == e.ID {
|
if c.ID == e.ID {
|
||||||
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref)))
|
meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref)))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,7 +348,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
|
ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0)
|
||||||
|
|
||||||
var chks []chunks.Meta
|
var chks []chunks.Meta
|
||||||
var b labels.ScratchBuilder
|
var b labels.ScratchBuilder
|
||||||
|
@ -429,17 +429,17 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari
|
||||||
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
|
name: "LabelValues calls with ooo head query range not overlapping out-of-order data",
|
||||||
queryMinT: 100,
|
queryMinT: 100,
|
||||||
queryMaxT: 100,
|
queryMaxT: 100,
|
||||||
expValues1: []string{},
|
expValues1: []string{"bar1"},
|
||||||
expValues2: []string{},
|
expValues2: nil,
|
||||||
expValues3: []string{},
|
expValues3: []string{"bar1", "bar2"},
|
||||||
expValues4: []string{},
|
expValues4: []string{"bar1", "bar2"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// We first want to test using a head index reader that covers the biggest query interval
|
// We first want to test using a head index reader that covers the biggest query interval
|
||||||
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
|
oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0)
|
||||||
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
|
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
|
||||||
values, err := oh.LabelValues(ctx, "foo", matchers...)
|
values, err := oh.LabelValues(ctx, "foo", matchers...)
|
||||||
sort.Strings(values)
|
sort.Strings(values)
|
||||||
|
@ -491,10 +491,10 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
||||||
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
|
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
|
||||||
db := newTestDBWithOpts(t, opts)
|
db := newTestDBWithOpts(t, opts)
|
||||||
|
|
||||||
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0)
|
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
|
||||||
defer cr.Close()
|
defer cr.Close()
|
||||||
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
||||||
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
||||||
})
|
})
|
||||||
require.Nil(t, iterable)
|
require.Nil(t, iterable)
|
||||||
require.Equal(t, err, fmt.Errorf("not found"))
|
require.Equal(t, err, fmt.Errorf("not found"))
|
||||||
|
@ -842,14 +842,14 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
||||||
|
|
||||||
// The Series method populates the chunk metas, taking a copy of the
|
// The Series method populates the chunk metas, taking a copy of the
|
||||||
// head OOO chunk if necessary. These are then used by the ChunkReader.
|
// head OOO chunk if necessary. These are then used by the ChunkReader.
|
||||||
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
|
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
|
||||||
var chks []chunks.Meta
|
var chks []chunks.Meta
|
||||||
var b labels.ScratchBuilder
|
var b labels.ScratchBuilder
|
||||||
err = ir.Series(s1Ref, &b, &chks)
|
err = ir.Series(s1Ref, &b, &chks)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(tc.expChunksSamples), len(chks))
|
require.Equal(t, len(tc.expChunksSamples), len(chks))
|
||||||
|
|
||||||
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
|
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
|
||||||
defer cr.Close()
|
defer cr.Close()
|
||||||
for i := 0; i < len(chks); i++ {
|
for i := 0; i < len(chks); i++ {
|
||||||
c, iterable, err := cr.ChunkOrIterable(chks[i])
|
c, iterable, err := cr.ChunkOrIterable(chks[i])
|
||||||
|
@ -1009,7 +1009,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
||||||
|
|
||||||
// The Series method populates the chunk metas, taking a copy of the
|
// The Series method populates the chunk metas, taking a copy of the
|
||||||
// head OOO chunk if necessary. These are then used by the ChunkReader.
|
// head OOO chunk if necessary. These are then used by the ChunkReader.
|
||||||
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
|
ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
|
||||||
var chks []chunks.Meta
|
var chks []chunks.Meta
|
||||||
var b labels.ScratchBuilder
|
var b labels.ScratchBuilder
|
||||||
err = ir.Series(s1Ref, &b, &chks)
|
err = ir.Series(s1Ref, &b, &chks)
|
||||||
|
@ -1025,7 +1025,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
||||||
}
|
}
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0)
|
cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0)
|
||||||
defer cr.Close()
|
defer cr.Close()
|
||||||
for i := 0; i < len(chks); i++ {
|
for i := 0; i < len(chks); i++ {
|
||||||
c, iterable, err := cr.ChunkOrIterable(chks[i])
|
c, iterable, err := cr.ChunkOrIterable(chks[i])
|
||||||
|
|
|
@ -115,20 +115,24 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||||
mint := q.mint
|
return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
|
||||||
maxt := q.maxt
|
}
|
||||||
|
|
||||||
|
func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
|
||||||
|
index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
|
||||||
|
) storage.SeriesSet {
|
||||||
disableTrimming := false
|
disableTrimming := false
|
||||||
sharded := hints != nil && hints.ShardCount > 0
|
sharded := hints != nil && hints.ShardCount > 0
|
||||||
|
|
||||||
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
p, err := PostingsForMatchers(ctx, index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.ErrSeriesSet(err)
|
return storage.ErrSeriesSet(err)
|
||||||
}
|
}
|
||||||
if sharded {
|
if sharded {
|
||||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||||
}
|
}
|
||||||
if sortSeries {
|
if sortSeries {
|
||||||
p = q.index.SortedPostings(p)
|
p = index.SortedPostings(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
if hints != nil {
|
if hints != nil {
|
||||||
|
@ -137,11 +141,11 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
|
||||||
disableTrimming = hints.DisableTrimming
|
disableTrimming = hints.DisableTrimming
|
||||||
if hints.Func == "series" {
|
if hints.Func == "series" {
|
||||||
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
||||||
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
|
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
|
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockChunkQuerier provides chunk querying access to a single block database.
|
// blockChunkQuerier provides chunk querying access to a single block database.
|
||||||
|
@ -159,8 +163,12 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||||
mint := q.mint
|
return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
|
||||||
maxt := q.maxt
|
}
|
||||||
|
|
||||||
|
func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
|
||||||
|
blockID ulid.ULID, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
|
||||||
|
) storage.ChunkSeriesSet {
|
||||||
disableTrimming := false
|
disableTrimming := false
|
||||||
sharded := hints != nil && hints.ShardCount > 0
|
sharded := hints != nil && hints.ShardCount > 0
|
||||||
|
|
||||||
|
@ -169,17 +177,17 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
|
||||||
maxt = hints.End
|
maxt = hints.End
|
||||||
disableTrimming = hints.DisableTrimming
|
disableTrimming = hints.DisableTrimming
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
p, err := PostingsForMatchers(ctx, index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.ErrChunkSeriesSet(err)
|
return storage.ErrChunkSeriesSet(err)
|
||||||
}
|
}
|
||||||
if sharded {
|
if sharded {
|
||||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||||
}
|
}
|
||||||
if sortSeries {
|
if sortSeries {
|
||||||
p = q.index.SortedPostings(p)
|
p = index.SortedPostings(p)
|
||||||
}
|
}
|
||||||
return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
|
return NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||||
|
@ -633,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hcr, ok := p.cr.(*headChunkReader)
|
hcr, ok := p.cr.(ChunkReaderWithCopy)
|
||||||
var iterable chunkenc.Iterable
|
var iterable chunkenc.Iterable
|
||||||
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
|
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
|
||||||
// ChunkWithCopy will copy the head chunk.
|
// ChunkOrIterableWithCopy will copy the head chunk, if it can.
|
||||||
var maxt int64
|
var maxt int64
|
||||||
p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta)
|
p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta)
|
||||||
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
|
if p.currMeta.Chunk != nil {
|
||||||
p.currMeta.MaxTime = maxt
|
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
|
||||||
|
p.currMeta.MaxTime = maxt
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
|
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -254,56 +255,98 @@ func BenchmarkMergedStringIter(b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkQuerierSelect(b *testing.B) {
|
func createHeadForBenchmarkSelect(b *testing.B, numSeries int, addSeries func(app storage.Appender, i int)) (*Head, *DB) {
|
||||||
opts := DefaultHeadOptions()
|
dir := b.TempDir()
|
||||||
opts.ChunkRange = 1000
|
opts := DefaultOptions()
|
||||||
opts.ChunkDirRoot = b.TempDir()
|
opts.OutOfOrderCapMax = 255
|
||||||
h, err := NewHead(nil, nil, nil, nil, opts, nil)
|
opts.OutOfOrderTimeWindow = 1000
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer h.Close()
|
b.Cleanup(func() {
|
||||||
|
require.NoError(b, db.Close())
|
||||||
|
})
|
||||||
|
h := db.Head()
|
||||||
|
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
numSeries := 1000000
|
|
||||||
for i := 0; i < numSeries; i++ {
|
for i := 0; i < numSeries; i++ {
|
||||||
app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
addSeries(app, i)
|
||||||
}
|
}
|
||||||
require.NoError(b, app.Commit())
|
require.NoError(b, app.Commit())
|
||||||
|
return h, db
|
||||||
|
}
|
||||||
|
|
||||||
bench := func(b *testing.B, br BlockReader, sorted bool) {
|
func benchmarkSelect(b *testing.B, queryable storage.Queryable, numSeries int, sorted bool) {
|
||||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
||||||
for s := 1; s <= numSeries; s *= 10 {
|
b.ResetTimer()
|
||||||
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
for s := 1; s <= numSeries; s *= 10 {
|
||||||
q, err := NewBlockQuerier(br, 0, int64(s-1))
|
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
||||||
require.NoError(b, err)
|
q, err := queryable.Querier(0, int64(s-1))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
ss := q.Select(context.Background(), sorted, nil, matcher)
|
ss := q.Select(context.Background(), sorted, nil, matcher)
|
||||||
for ss.Next() {
|
for ss.Next() {
|
||||||
}
|
|
||||||
require.NoError(b, ss.Err())
|
|
||||||
}
|
}
|
||||||
q.Close()
|
require.NoError(b, ss.Err())
|
||||||
})
|
}
|
||||||
}
|
q.Close()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkQuerierSelect(b *testing.B) {
|
||||||
|
numSeries := 1000000
|
||||||
|
h, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
||||||
|
_, err := app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
b.Run("Head", func(b *testing.B) {
|
b.Run("Head", func(b *testing.B) {
|
||||||
bench(b, h, false)
|
benchmarkSelect(b, db, numSeries, false)
|
||||||
})
|
})
|
||||||
b.Run("SortedHead", func(b *testing.B) {
|
b.Run("SortedHead", func(b *testing.B) {
|
||||||
bench(b, h, true)
|
benchmarkSelect(b, db, numSeries, true)
|
||||||
})
|
})
|
||||||
|
|
||||||
tmpdir := b.TempDir()
|
|
||||||
|
|
||||||
blockdir := createBlockFromHead(b, tmpdir, h)
|
|
||||||
block, err := OpenBlock(nil, blockdir, nil)
|
|
||||||
require.NoError(b, err)
|
|
||||||
defer func() {
|
|
||||||
require.NoError(b, block.Close())
|
|
||||||
}()
|
|
||||||
|
|
||||||
b.Run("Block", func(b *testing.B) {
|
b.Run("Block", func(b *testing.B) {
|
||||||
bench(b, block, false)
|
tmpdir := b.TempDir()
|
||||||
|
|
||||||
|
blockdir := createBlockFromHead(b, tmpdir, h)
|
||||||
|
block, err := OpenBlock(nil, blockdir, nil)
|
||||||
|
require.NoError(b, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(b, block.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
benchmarkSelect(b, (*queryableBlock)(block), numSeries, false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type wrapper to let a Block be a Queryable in benchmarkSelect().
|
||||||
|
type queryableBlock Block
|
||||||
|
|
||||||
|
func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||||
|
return NewBlockQuerier((*Block)(pb), mint, maxt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) {
|
||||||
|
numSeries := 1000000
|
||||||
|
_, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
||||||
|
l := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix))
|
||||||
|
ref, err := app.Append(0, l, int64(i+1), 0)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = app.Append(ref, l, int64(i), 1) // Out of order sample
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("Head", func(b *testing.B) {
|
||||||
|
benchmarkSelect(b, db, numSeries, false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3169,12 +3169,11 @@ func BenchmarkQueries(b *testing.B) {
|
||||||
|
|
||||||
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
|
qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples)
|
isoState := head.oooIso.TrackReadAfter(0)
|
||||||
require.NoError(b, err)
|
qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead)
|
||||||
|
|
||||||
queryTypes = append(queryTypes, qt{
|
queryTypes = append(queryTypes, qt{
|
||||||
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage),
|
fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead,
|
||||||
storage.NewMergeQuerier([]storage.Querier{qHead, qOOOHead}, nil, storage.ChainedSeriesMerge),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue