diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go index 75089b168b..d7cc560881 100644 --- a/cmd/promtool/tsdb_test.go +++ b/cmd/promtool/tsdb_test.go @@ -20,6 +20,7 @@ import ( "math" "os" "runtime" + "slices" "strings" "testing" "time" @@ -152,12 +153,18 @@ func TestTSDBDump(t *testing.T) { expectedMetrics, err := os.ReadFile(tt.expectedDump) require.NoError(t, err) expectedMetrics = normalizeNewLine(expectedMetrics) - // even though in case of one matcher samples are not sorted, the order in the cases above should stay the same. - require.Equal(t, string(expectedMetrics), dumpedMetrics) + // Sort both, because Prometheus does not guarantee the output order. + require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics)) }) } } +func sortLines(buf string) string { + lines := strings.Split(buf, "\n") + slices.Sort(lines) + return strings.Join(lines, "\n") +} + func TestTSDBDumpOpenMetrics(t *testing.T) { storage := promqltest.LoadedStorage(t, ` load 1m @@ -169,7 +176,7 @@ func TestTSDBDumpOpenMetrics(t *testing.T) { require.NoError(t, err) expectedMetrics = normalizeNewLine(expectedMetrics) dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics) - require.Equal(t, string(expectedMetrics), dumpedMetrics) + require.Equal(t, sortLines(string(expectedMetrics)), sortLines(dumpedMetrics)) } func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) { diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index e877657dba..3ea98c5db9 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -970,7 +970,7 @@ func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) { }.Run(t) } -// TestEndpointsUpdatePod makes sure that Endpoints discovery detects underlying Pods changes. +// TestEndpointsDiscoveryUpdatePod makes sure that Endpoints discovery detects underlying Pods changes. // See https://github.com/prometheus/prometheus/issues/11305 for more details. func TestEndpointsDiscoveryUpdatePod(t *testing.T) { pod := &v1.Pod{ diff --git a/docs/querying/functions.md b/docs/querying/functions.md index ee81328b5e..bf2701b881 100644 --- a/docs/querying/functions.md +++ b/docs/querying/functions.md @@ -617,7 +617,7 @@ Like `sort`, `sort_desc` only affects the results of instant queries, as range q ## `sort_by_label()` -**This function has to be enabled via the [feature flag](../feature_flags.md) `--enable-feature=promql-experimental-functions`.** +**This function has to be enabled via the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions`.** `sort_by_label(v instant-vector, label string, ...)` returns vector elements sorted by their label values and sample value in case of label values being equal, in ascending order. @@ -627,7 +627,7 @@ This function uses [natural sort order](https://en.wikipedia.org/wiki/Natural_so ## `sort_by_label_desc()` -**This function has to be enabled via the [feature flag](../feature_flags.md) `--enable-feature=promql-experimental-functions`.** +**This function has to be enabled via the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions`.** Same as `sort_by_label`, but sorts in descending order. @@ -676,7 +676,7 @@ over time and return an instant vector with per-series aggregation results: * `last_over_time(range-vector)`: the most recent point value in the specified interval. * `present_over_time(range-vector)`: the value 1 for any series in the specified interval. -If the [feature flag](../feature_flags.md) +If the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions` is set, the following additional functions are available: diff --git a/promql/engine.go b/promql/engine.go index 9d814f10ba..daa46ab6f2 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2786,11 +2786,12 @@ type groupedAggregation struct { heap vectorByValueHeap // All bools together for better packing within the struct. - seen bool // Was this output groups seen in the input at this timestamp. - hasFloat bool // Has at least 1 float64 sample aggregated. - hasHistogram bool // Has at least 1 histogram sample aggregated. - groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. - incrementalMean bool // True after reverting to incremental calculation of the mean value. + seen bool // Was this output groups seen in the input at this timestamp. + hasFloat bool // Has at least 1 float64 sample aggregated. + hasHistogram bool // Has at least 1 histogram sample aggregated. + incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets, or incompatible custom buckets. + groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. + incrementalMean bool // True after reverting to incremental calculation of the mean value. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -2814,10 +2815,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // Initialize this group if it's the first time we've seen it. if !group.seen { *group = groupedAggregation{ - seen: true, - floatValue: f, - floatMean: f, - groupCount: 1, + seen: true, + floatValue: f, + floatMean: f, + incompatibleHistograms: false, + groupCount: 1, } switch op { case parser.AVG, parser.SUM: @@ -2838,6 +2840,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix continue } + if group.incompatibleHistograms { + continue + } + switch op { case parser.SUM: if h != nil { @@ -2846,6 +2852,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix _, err := group.histogramValue.Add(h) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) + group.incompatibleHistograms = true } } // Otherwise the aggregation contained floats @@ -2866,10 +2873,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix toAdd, err := left.Sub(right) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) + group.incompatibleHistograms = true + continue } _, err = group.histogramValue.Add(toAdd) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) + group.incompatibleHistograms = true + continue } } // Otherwise the aggregation contained floats @@ -2966,6 +2977,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix continue } switch { + case aggr.incompatibleHistograms: + continue case aggr.hasHistogram: aggr.histogramValue = aggr.histogramValue.Compact(0) case aggr.incrementalMean: @@ -2992,9 +3005,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } - if aggr.hasHistogram { + switch { + case aggr.incompatibleHistograms: + continue + case aggr.hasHistogram: aggr.histogramValue.Compact(0) - } else { + default: aggr.floatValue += aggr.floatKahanC } default: @@ -3161,7 +3177,7 @@ seriesLoop: return mat, annos } -// aggregationK evaluates count_values on vec. +// aggregationCountValues evaluates count_values on vec. // Outputs as many series per group as there are values in the input. func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { type groupCount struct { diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index c2a5012e96..ea838f1e32 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -785,6 +785,8 @@ eval_warn instant at 1m rate(some_metric[30s]) eval_warn instant at 30s rate(some_metric[30s]) # Should produce no results. +clear + # Histogram with constant buckets. load 1m const_histogram {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} {{schema:0 sum:1 count:1 buckets:[1 1 1]}} @@ -828,3 +830,59 @@ eval instant at 5m histogram_stddev(rate(const_histogram[5m])) # Zero buckets mean no observations, so there is no standard variance. eval instant at 5m histogram_stdvar(rate(const_histogram[5m])) {} NaN + +clear + +# Test mixing exponential and custom buckets. +load 6m + metric{series="exponential"} {{sum:4 count:3 buckets:[1 2 1]}} _ {{sum:4 count:3 buckets:[1 2 1]}} + metric{series="other-exponential"} {{sum:3 count:2 buckets:[1 1 1]}} _ {{sum:3 count:2 buckets:[1 1 1]}} + metric{series="custom"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="other-custom"} _ {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} + +# T=0: only exponential +# T=6: only custom +# T=12: mixed, should be ignored and emit a warning +eval_warn range from 0 to 12m step 6m sum(metric) + {} {{sum:7 count:5 buckets:[2 3 2]}} {{schema:-53 sum:16 count:3 custom_values:[5 10] buckets:[1 2]}} _ + +eval_warn range from 0 to 12m step 6m avg(metric) + {} {{sum:3.5 count:2.5 buckets:[1 1.5 1]}} {{schema:-53 sum:8 count:1.5 custom_values:[5 10] buckets:[0.5 1]}} _ + +clear + +# Test incompatible custom bucket schemas. +load 6m + metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} + metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + +# T=0: incompatible, should be ignored and emit a warning +# T=6: compatible +# T=12: incompatible followed by compatible, should be ignored and emit a warning +eval_warn range from 0 to 12m step 6m sum(metric) + {} _ {{schema:-53 sum:2 count:2 custom_values:[5 10] buckets:[2]}} _ + +eval_warn range from 0 to 12m step 6m avg(metric) + {} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} _ + +clear + +load 1m + metric{group="just-floats", series="1"} 2 + metric{group="just-floats", series="2"} 3 + metric{group="just-exponential-histograms", series="1"} {{sum:3 count:4 buckets:[1 2 1]}} + metric{group="just-exponential-histograms", series="2"} {{sum:2 count:3 buckets:[1 1 1]}} + metric{group="just-custom-histograms", series="1"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} + metric{group="just-custom-histograms", series="2"} {{schema:-53 sum:3 count:4 custom_values:[2] buckets:[7]}} + metric{group="floats-and-histograms", series="1"} 2 + metric{group="floats-and-histograms", series="2"} {{sum:2 count:3 buckets:[1 1 1]}} + metric{group="exponential-and-custom-histograms", series="1"} {{sum:2 count:3 buckets:[1 1 1]}} + metric{group="exponential-and-custom-histograms", series="2"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{group="incompatible-custom-histograms", series="1"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} + metric{group="incompatible-custom-histograms", series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} + +eval_warn instant at 0 sum by (group) (metric) + {group="just-floats"} 5 + {group="just-exponential-histograms"} {{sum:5 count:7 buckets:[2 3 2]}} + {group="just-custom-histograms"} {{schema:-53 sum:4 count:5 custom_values:[2] buckets:[8]}} diff --git a/storage/series_test.go b/storage/series_test.go index 51886f409b..f8ba2af67c 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -72,7 +72,7 @@ func TestListSeriesIterator(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(2)) } -// TestSeriesSetToChunkSet test the property of SeriesSet that says +// TestChunkSeriesSetToSeriesSet test the property of SeriesSet that says // returned series should be iterable even after Next is called. func TestChunkSeriesSetToSeriesSet(t *testing.T) { series := []struct { diff --git a/tsdb/db.go b/tsdb/db.go index 610f5e7ef6..908f12bd3e 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2046,7 +2046,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } } - blockQueriers := make([]storage.Querier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.Querier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2058,10 +2058,12 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } }() - if maxt >= db.head.MinTime() { + overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) + var headQuerier storage.Querier + if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -2071,36 +2073,28 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head block querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - var err error - outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) - if err != nil { - // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. - rh.isoState.Close() + if overlapsOOO { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOQuerier(mint, maxt, db.head, isoState, headQuerier) + } - return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) - } - - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { @@ -2128,7 +2122,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } } - blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+2) // +2 to allow for possible in-order and OOO head queriers + blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)+1) // +1 to allow for possible head querier. defer func() { if err != nil { @@ -2140,9 +2134,11 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } }() - if maxt >= db.head.MinTime() { + overlapsOOO := overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) + var headQuerier storage.ChunkQuerier + if maxt >= db.head.MinTime() || overlapsOOO { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2152,35 +2148,28 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) if shouldClose { - if err := inOrderHeadQuerier.Close(); err != nil { + if err := headQuerier.Close(); err != nil { return nil, fmt.Errorf("closing head querier %s: %w", rh, err) } - inOrderHeadQuerier = nil + headQuerier = nil } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) + headQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } } - - if inOrderHeadQuerier != nil { - blockQueriers = append(blockQueriers, inOrderHeadQuerier) - } } - if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { - rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) - if err != nil { - // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. - rh.isoState.Close() + if overlapsOOO { + // We need to fetch from in-order and out-of-order chunks: wrap the headQuerier. + isoState := db.head.oooIso.TrackReadAfter(db.lastGarbageCollectedMmapRef) + headQuerier = NewHeadAndOOOChunkQuerier(mint, maxt, db.head, isoState, headQuerier) + } - return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) - } - - blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) } for _, b := range blocks { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 9e5bebaac4..bca5d4e815 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,7 @@ import ( "fmt" "math" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/exemplar" @@ -1024,7 +1025,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1120,7 +1121,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1216,7 +1217,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, oooCapMax, a.head.logger) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1314,14 +1315,14 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger log.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper) + c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger) chunkCreated = true } @@ -1675,9 +1676,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange } // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. -// The caller must ensure that s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { - ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) +// The caller must ensure that s is locked and s.ooo is not nil. +func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { + ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger) s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), @@ -1688,7 +1689,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk return s.ooo.oooHeadChunk, ref } -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef { +// s must be locked when calling. +func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { // OOO is not enabled or there is no head chunk, so nothing to m-map here. return nil @@ -1700,6 +1702,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap } chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, len(chks)) for _, memchunk := range chks { + if len(s.ooo.oooMmappedChunks) >= (oooChunkIDMask - 1) { + level.Error(logger).Log("msg", "Too many OOO chunks, dropping data", "series", s.lset.String()) + break + } chunkRef := chunkDiskMapper.WriteChunk(s.ref, memchunk.minTime, memchunk.maxTime, memchunk.chunk, true, handleChunkWriteError) chunkRefs = append(chunkRefs, chunkRef) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ diff --git a/tsdb/head_read.go b/tsdb/head_read.go index c8b394be8a..47f12df994 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -199,13 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB defer s.Unlock() *chks = (*chks)[:0] + *chks = appendSeriesChunks(s, h.mint, h.maxt, *chks) + return nil +} + +func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. - if !c.OverlapsClosedInterval(h.mint, h.maxt) { + if !c.OverlapsClosedInterval(mint, maxt) { continue } - *chks = append(*chks, chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), @@ -223,8 +228,8 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB } else { maxTime = chk.maxTime } - if chk.OverlapsClosedInterval(h.mint, h.maxt) { - *chks = append(*chks, chunks.Meta{ + if chk.OverlapsClosedInterval(mint, maxt) { + chks = append(chks, chunks.Meta{ MinTime: chk.minTime, MaxTime: maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), @@ -233,8 +238,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB j++ } } - - return nil + return chks } // headChunkID returns the HeadChunkID referred to by the given position. @@ -244,12 +248,20 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { return chunks.HeadChunkID(pos) + s.firstChunkID } +const oooChunkIDMask = 1 << 23 + // oooHeadChunkID returns the HeadChunkID referred to by the given position. +// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest: // * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] // * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk // The caller must ensure that s.ooo is not nil. func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { - return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID + return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask +} + +func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) { + sid, cid := chunks.HeadChunkRef(ref).Unpack() + return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0 } // LabelValueFor returns label value for the given label name in the series referred to by ID. @@ -339,10 +351,15 @@ func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chu return chk, nil, err } -// ChunkWithCopy returns the chunk for the reference number. -// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk. -func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) { - return h.chunk(meta, true) +type ChunkReaderWithCopy interface { + ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) +} + +// ChunkOrIterableWithCopy returns the chunk for the reference number. +// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk. +func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + chk, maxTime, err := h.chunk(meta, true) + return chk, nil, maxTime, err } // chunk returns the chunk for the reference number. @@ -358,9 +375,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. } s.Lock() + defer s.Unlock() + return h.chunkFromSeries(s, cid, copyLastChunk) +} + +// Call with s locked. +func (h *headChunkReader) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, copyLastChunk bool) (chunkenc.Chunk, int64, error) { c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { - s.Unlock() return nil, 0, err } defer func() { @@ -374,7 +396,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. // This means that the chunk is outside the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { - s.Unlock() return nil, 0, storage.ErrNotFound } @@ -391,7 +412,6 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. return nil, 0, err } } - s.Unlock() return &safeHeadChunk{ Chunk: chk, @@ -461,14 +481,15 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// oooMergedChunks return an iterable over one or more OOO chunks for the given +// mergedChunks return an iterable over one or more OOO chunks for the given // chunks.Meta reference from memory or by m-mapping it from the disk. The // returned iterable will be a merge of all the overlapping chunks, if any, // amongst all the chunks in the OOOHead. +// If hr is non-nil then in-order chunks are included. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { - _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() +func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) { + _, cid, _ := unpackHeadChunkRef(meta.Ref) // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are // incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. @@ -509,6 +530,16 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) } + if hr != nil { // Include in-order chunks. + metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil) + for _, m := range metas { + tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ + meta: m, + ref: 0, // This tells the loop below it's an in-order head chunk. + }) + } + } + // Next we want to sort all the collected chunks by min time so we can find // those that overlap and stop when we know the rest don't. slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) @@ -520,9 +551,17 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe continue } var iterable chunkenc.Iterable - if c.meta.Chunk != nil { + switch { + case c.meta.Chunk != nil: iterable = c.meta.Chunk - } else { + case c.ref == 0: // This is an in-order head chunk. + _, cid := chunks.HeadChunkRef(c.meta.Ref).Unpack() + var err error + iterable, _, err = hr.chunkFromSeries(s, cid, false) + if err != nil { + return nil, fmt.Errorf("invalid head chunk: %w", err) + } + default: chk, err := cdm.Chunk(c.ref) if err != nil { var cerr *chunks.CorruptionErr diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 02cc310de3..fc412428fb 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5868,7 +5868,7 @@ func TestCuttingNewHeadChunks(t *testing.T) { } } -// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample +// TestHeadDetectsDuplicateSampleAtSizeLimit tests a regression where a duplicate sample // is appended to the head, right when the head chunk is at the size limit. // The test adds all samples as duplicate, thus expecting that the result has // exactly half of the samples. diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 01e33e243b..5e804eec43 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -1006,7 +1006,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi unknownRefs++ continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -1033,9 +1033,9 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi var chunkCreated bool var ok bool if s.h != nil { - ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) } else { - ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) } if chunkCreated { h.metrics.chunksCreated.Inc() diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 209b14673c..0ed9f36484 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -14,16 +14,10 @@ package tsdb import ( - "fmt" "sort" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" - - "github.com/oklog/ulid" - - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/tombstones" ) // OOOChunk maintains samples in time-ascending order. @@ -171,75 +165,3 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } return chks, nil } - -var _ BlockReader = &OOORangeHead{} - -// OOORangeHead allows querying Head out of order samples via BlockReader -// interface implementation. -type OOORangeHead struct { - head *Head - // mint and maxt are tracked because when a query is handled we only want - // the timerange of the query and having preexisting pointers to the first - // and last timestamp help with that. - mint, maxt int64 - - isoState *oooIsolationState -} - -func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead { - isoState := head.oooIso.TrackReadAfter(minRef) - - return &OOORangeHead{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, - } -} - -func (oh *OOORangeHead) Index() (IndexReader, error) { - return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt, oh.isoState.minRef), nil -} - -func (oh *OOORangeHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt, oh.isoState, 0), nil -} - -func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { - // As stated in the design doc https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing - // Tombstones are not supported for out of order metrics. - return tombstones.NewMemTombstones(), nil -} - -var oooRangeHeadULID = ulid.MustParse("0000000000XXXX000RANGEHEAD") - -func (oh *OOORangeHead) Meta() BlockMeta { - return BlockMeta{ - MinTime: oh.mint, - MaxTime: oh.maxt, - ULID: oooRangeHeadULID, - Stats: BlockStats{ - NumSeries: oh.head.NumSeries(), - }, - } -} - -// Size returns the size taken by the Head block. -func (oh *OOORangeHead) Size() int64 { - return oh.head.Size() -} - -// String returns an human readable representation of the out of order range -// head. It's important to keep this function in order to avoid the struct dump -// when the head is stringified in errors or logs. -func (oh *OOORangeHead) String() string { - return fmt.Sprintf("ooo range head (mint: %d, maxt: %d)", oh.MinTime(), oh.MaxTime()) -} - -func (oh *OOORangeHead) MinTime() int64 { - return oh.mint -} - -func (oh *OOORangeHead) MaxTime() int64 { - return oh.maxt -} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index ad640c7814..ae6f70190f 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -27,15 +27,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" ) -// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be -// accessed. -// It also has a reference to headIndexReader so we can leverage on its -// IndexReader implementation for all the methods that remain the same. We -// decided to do this to avoid code duplication. -// The only methods that change are the ones about getting Series and Postings. -type OOOHeadIndexReader struct { +type HeadAndOOOIndexReader struct { *headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible. lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef } @@ -49,17 +44,13 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables) } -func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader { +func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader { hr := &headIndexReader{ head: head, mint: mint, maxt: maxt, } - return &OOOHeadIndexReader{hr, lastGarbageCollectedMmapRef} -} - -func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0) + return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef} } type MultiChunk struct { @@ -114,7 +105,7 @@ func (c MultiChunk) Reset([]byte) { // // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef) error { +func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { @@ -131,10 +122,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra defer s.Unlock() *chks = (*chks)[:0] - if s.ooo == nil { - return nil + if s.ooo != nil { + return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) } + *chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks) + return nil +} +// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so +// any chunk at or before this ref will not be considered. 0 disables this check. +// +// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then +// the oooHeadChunk will not be considered. +func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -149,7 +149,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra // Collect all chunks that overlap the query range. if s.ooo.oooHeadChunk != nil { c := s.ooo.oooHeadChunk - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { + if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. headChunks := MultiChunk{} @@ -170,12 +170,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { c := s.ooo.oooMmappedChunks[i] - if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { + if c.OverlapsClosedInterval(mint, maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) addChunk(c.minTime, c.maxTime, ref, nil) } } + if includeInOrder { + tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks) + } + // There is nothing to do if we did not collect any chunk. if len(tmpChks) == 0 { return nil @@ -212,11 +216,10 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra return nil } -// LabelValues needs to be overridden from the headIndexReader implementation due -// to the check that happens at the beginning where we make sure that the query -// interval overlaps with the head minooot and maxooot. -func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { - if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { +// LabelValues needs to be overridden from the headIndexReader implementation +// so we can return labels within either in-order range or ooo range. +func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() { return []string{}, nil } @@ -268,41 +271,30 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int { } } -func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { - switch len(values) { - case 0: - return index.EmptyPostings(), nil - case 1: - return oh.head.postings.Get(name, values[0]), nil // TODO(ganesh) Also call GetOOOPostings - default: - // TODO(ganesh) We want to only return postings for out of order series. - res := make([]index.Postings, 0, len(values)) - for _, value := range values { - res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings - } - return index.Merge(ctx, res...), nil +type HeadAndOOOChunkReader struct { + head *Head + mint, maxt int64 + cr *headChunkReader // If nil, only read OOO chunks. + maxMmapRef chunks.ChunkDiskMapperRef + oooIsoState *oooIsolationState +} + +func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { + return &HeadAndOOOChunkReader{ + head: head, + mint: mint, + maxt: maxt, + cr: cr, + maxMmapRef: maxMmapRef, + oooIsoState: oooIsoState, } } -type OOOHeadChunkReader struct { - head *Head - mint, maxt int64 - isoState *oooIsolationState - maxMmapRef chunks.ChunkDiskMapperRef -} - -func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *OOOHeadChunkReader { - return &OOOHeadChunkReader{ - head: head, - mint: mint, - maxt: maxt, - isoState: isoState, - maxMmapRef: maxMmapRef, +func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + sid, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterable(meta) } -} - -func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack() s := cr.head.series.getByID(sid) // This means that the series has been garbage collected. @@ -311,34 +303,35 @@ func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, } s.Lock() - if s.ooo == nil { - // There is no OOO data for this series. - s.Unlock() - return nil, nil, storage.ErrNotFound - } - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt, cr.maxMmapRef) + mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() - if err != nil { - return nil, nil, err - } - // This means that the query range did not overlap with the requested chunk. - if len(mc.chunkIterables) == 0 { - return nil, nil, storage.ErrNotFound - } - - return nil, mc, nil + return nil, mc, err } -func (cr OOOHeadChunkReader) Close() error { - if cr.isoState != nil { - cr.isoState.Close() +// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour +// is only implemented for the in-order head chunk. +func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { + _, _, isOOO := unpackHeadChunkRef(meta.Ref) + if !isOOO { + return cr.cr.ChunkOrIterableWithCopy(meta) + } + chk, iter, err := cr.ChunkOrIterable(meta) + return chk, iter, 0, err +} + +func (cr *HeadAndOOOChunkReader) Close() error { + if cr.cr != nil && cr.cr.isoState != nil { + cr.cr.isoState.Close() + } + if cr.oooIsoState != nil { + cr.oooIsoState.Close() } return nil } type OOOCompactionHead struct { - oooIR *OOOHeadIndexReader + head *Head lastMmapRef chunks.ChunkDiskMapperRef lastWBLFile int postings []storage.SeriesRef @@ -355,6 +348,7 @@ type OOOCompactionHead struct { // on the sample append latency. So call NewOOOCompactionHead only right before compaction. func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error) { ch := &OOOCompactionHead{ + head: head, chunkRange: head.chunkRange.Load(), mint: math.MaxInt64, maxt: math.MinInt64, @@ -368,15 +362,14 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, ch.lastWBLFile = lastWBLFile } - ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64, 0) + hr := headIndexReader{head: head, mint: ch.mint, maxt: ch.maxt} n, v := index.AllPostingsKey() - - // TODO: verify this gets only ooo samples. - p, err := ch.oooIR.Postings(ctx, n, v) + // TODO: filter to series with OOO samples, before sorting. + p, err := hr.Postings(ctx, n, v) if err != nil { return nil, err } - p = ch.oooIR.SortedPostings(p) + p = hr.SortedPostings(p) var lastSeq, lastOff int for p.Next() { @@ -397,7 +390,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, } var lastMmapRef chunks.ChunkDiskMapperRef - mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger) if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref} @@ -433,7 +426,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) { } func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { - return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt, nil, ch.lastMmapRef), nil + return NewHeadAndOOOChunkReader(ch.head, ch.mint, ch.maxt, nil, nil, ch.lastMmapRef), nil } func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { @@ -459,12 +452,12 @@ func (ch *OOOCompactionHead) Meta() BlockMeta { // Only the method of BlockReader interface are valid for the cloned OOOCompactionHead. func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead { return &OOOCompactionHead{ - oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt, 0), + head: ch.head, lastMmapRef: ch.lastMmapRef, postings: ch.postings, chunkRange: ch.chunkRange, - mint: ch.mint, - maxt: ch.maxt, + mint: mint, + maxt: maxt, } } @@ -484,7 +477,8 @@ func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader { } func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter { - return ir.ch.oooIR.Symbols() + hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt} + return hr.Symbols() } func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error) { @@ -505,11 +499,28 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P } func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { - return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) + hr := headIndexReader{head: ir.ch.head, mint: ir.ch.mint, maxt: ir.ch.maxt} + return hr.ShardedPostings(p, shardIndex, shardCount) } func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { - return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) + s := ir.ch.head.series.getByID(chunks.HeadSeriesRef(ref)) + + if s == nil { + ir.ch.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.labels()) + + s.Lock() + defer s.Unlock() + *chks = (*chks)[:0] + + if s.ooo == nil { + return nil + } + + return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, chks) } func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -537,5 +548,91 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti } func (ir *OOOCompactionHeadIndexReader) Close() error { - return ir.ch.oooIR.Close() + return nil +} + +// HeadAndOOOQuerier queries both the head and the out-of-order head. +type HeadAndOOOQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.Querier +} + +func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { + cr := &headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: head.iso.State(mint, maxt), + } + return &HeadAndOOOQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return selectSeriesSet(ctx, sortSeries, hints, matchers, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) +} + +// HeadAndOOOChunkQuerier queries both the head and the out-of-order head. +type HeadAndOOOChunkQuerier struct { + mint, maxt int64 + head *Head + index IndexReader + chunkr ChunkReader + querier storage.ChunkQuerier +} + +func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { + cr := &headChunkReader{ + head: head, + mint: mint, + maxt: maxt, + isoState: head.iso.State(mint, maxt), + } + return &HeadAndOOOChunkQuerier{ + mint: mint, + maxt: maxt, + head: head, + index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), + chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), + querier: querier, + } +} + +func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelValues(ctx, name, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return q.querier.LabelNames(ctx, hints, matchers...) +} + +func (q *HeadAndOOOChunkQuerier) Close() error { + q.chunkr.Close() + return q.querier.Close() +} + +func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt) } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index adda30e75f..c121f863c6 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -35,7 +35,7 @@ import ( var ( _ chunkenc.Chunk = &MultiChunk{} _ chunkenc.Iterable = &mergedOOOChunks{} - _ IndexReader = &OOOHeadIndexReader{} + _ IndexReader = &HeadAndOOOIndexReader{} ) type chunkInterval struct { @@ -323,7 +323,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Ref to whatever Ref the chunk has, that we refer to by ID for ref, c := range intervals { if c.ID == e.ID { - meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), chunks.HeadChunkID(ref))) + meta.Ref = chunks.ChunkRef(chunks.NewHeadChunkRef(chunks.HeadSeriesRef(s1ID), s1.oooHeadChunkID(ref))) break } } @@ -348,7 +348,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { }) } - ir := NewOOOHeadIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(h, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder @@ -429,17 +429,17 @@ func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenari name: "LabelValues calls with ooo head query range not overlapping out-of-order data", queryMinT: 100, queryMaxT: 100, - expValues1: []string{}, - expValues2: []string{}, - expValues3: []string{}, - expValues4: []string{}, + expValues1: []string{"bar1"}, + expValues2: nil, + expValues3: []string{"bar1", "bar2"}, + expValues4: []string{"bar1", "bar2"}, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { // We first want to test using a head index reader that covers the biggest query interval - oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) + oh := NewHeadAndOOOIndexReader(head, tc.queryMinT, tc.queryMaxT, 0) matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")} values, err := oh.LabelValues(ctx, "foo", matchers...) sort.Strings(values) @@ -491,10 +491,10 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) - cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found")) @@ -842,14 +842,14 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) @@ -1009,7 +1009,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // The Series method populates the chunk metas, taking a copy of the // head OOO chunk if necessary. These are then used by the ChunkReader. - ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) + ir := NewHeadAndOOOIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder err = ir.Series(s1Ref, &b, &chks) @@ -1025,7 +1025,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, 0) + cr := NewHeadAndOOOChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil, nil, 0) defer cr.Close() for i := 0; i < len(chks); i++ { c, iterable, err := cr.ChunkOrIterable(chks[i]) diff --git a/tsdb/querier.go b/tsdb/querier.go index 910c2d7fc1..2e15f0b084 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -115,20 +115,24 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { } func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { - mint := q.mint - maxt := q.maxt + return selectSeriesSet(ctx, sortSeries, hints, ms, q.index, q.chunks, q.tombstones, q.mint, q.maxt) +} + +func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, +) storage.SeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 - p, err := PostingsForMatchers(ctx, q.index, ms...) + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrSeriesSet(err) } if sharded { - p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { - p = q.index.SortedPostings(p) + p = index.SortedPostings(p) } if hints != nil { @@ -137,11 +141,11 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora disableTrimming = hints.DisableTrimming if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. - return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) } } - return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming) } // blockChunkQuerier provides chunk querying access to a single block database. @@ -159,8 +163,12 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier } func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { - mint := q.mint - maxt := q.maxt + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, +) storage.ChunkSeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 @@ -169,17 +177,17 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * maxt = hints.End disableTrimming = hints.DisableTrimming } - p, err := PostingsForMatchers(ctx, q.index, ms...) + p, err := PostingsForMatchers(ctx, index, ms...) if err != nil { return storage.ErrChunkSeriesSet(err) } if sharded { - p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { - p = q.index.SortedPostings(p) + p = index.SortedPostings(p) } - return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) + return NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming) } // PostingsForMatchers assembles a single postings iterator against the index reader @@ -633,14 +641,16 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { } } - hcr, ok := p.cr.(*headChunkReader) + hcr, ok := p.cr.(ChunkReaderWithCopy) var iterable chunkenc.Iterable if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { - // ChunkWithCopy will copy the head chunk. + // ChunkOrIterableWithCopy will copy the head chunk, if it can. var maxt int64 - p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta) - // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. - p.currMeta.MaxTime = maxt + p.currMeta.Chunk, iterable, maxt, p.err = hcr.ChunkOrIterableWithCopy(p.currMeta) + if p.currMeta.Chunk != nil { + // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. + p.currMeta.MaxTime = maxt + } } else { p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta) } diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 9a82302420..43accc253b 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/require" @@ -254,56 +255,98 @@ func BenchmarkMergedStringIter(b *testing.B) { b.ReportAllocs() } -func BenchmarkQuerierSelect(b *testing.B) { - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = b.TempDir() - h, err := NewHead(nil, nil, nil, nil, opts, nil) +func createHeadForBenchmarkSelect(b *testing.B, numSeries int, addSeries func(app storage.Appender, i int)) (*Head, *DB) { + dir := b.TempDir() + opts := DefaultOptions() + opts.OutOfOrderCapMax = 255 + opts.OutOfOrderTimeWindow = 1000 + db, err := Open(dir, nil, nil, opts, nil) require.NoError(b, err) - defer h.Close() + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + h := db.Head() + app := h.Appender(context.Background()) - numSeries := 1000000 for i := 0; i < numSeries; i++ { - app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) + addSeries(app, i) } require.NoError(b, app.Commit()) + return h, db +} - bench := func(b *testing.B, br BlockReader, sorted bool) { - matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") - for s := 1; s <= numSeries; s *= 10 { - b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { - q, err := NewBlockQuerier(br, 0, int64(s-1)) - require.NoError(b, err) +func benchmarkSelect(b *testing.B, queryable storage.Queryable, numSeries int, sorted bool) { + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + b.ResetTimer() + for s := 1; s <= numSeries; s *= 10 { + b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) { + q, err := queryable.Querier(0, int64(s-1)) + require.NoError(b, err) - b.ResetTimer() - for i := 0; i < b.N; i++ { - ss := q.Select(context.Background(), sorted, nil, matcher) - for ss.Next() { - } - require.NoError(b, ss.Err()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + ss := q.Select(context.Background(), sorted, nil, matcher) + for ss.Next() { } - q.Close() - }) - } + require.NoError(b, ss.Err()) + } + q.Close() + }) } +} + +func BenchmarkQuerierSelect(b *testing.B) { + numSeries := 1000000 + h, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) { + _, err := app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) + if err != nil { + b.Fatal(err) + } + }) b.Run("Head", func(b *testing.B) { - bench(b, h, false) + benchmarkSelect(b, db, numSeries, false) }) b.Run("SortedHead", func(b *testing.B) { - bench(b, h, true) + benchmarkSelect(b, db, numSeries, true) }) - tmpdir := b.TempDir() - - blockdir := createBlockFromHead(b, tmpdir, h) - block, err := OpenBlock(nil, blockdir, nil) - require.NoError(b, err) - defer func() { - require.NoError(b, block.Close()) - }() - b.Run("Block", func(b *testing.B) { - bench(b, block, false) + tmpdir := b.TempDir() + + blockdir := createBlockFromHead(b, tmpdir, h) + block, err := OpenBlock(nil, blockdir, nil) + require.NoError(b, err) + defer func() { + require.NoError(b, block.Close()) + }() + + benchmarkSelect(b, (*queryableBlock)(block), numSeries, false) + }) +} + +// Type wrapper to let a Block be a Queryable in benchmarkSelect(). +type queryableBlock Block + +func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) { + return NewBlockQuerier((*Block)(pb), mint, maxt) +} + +func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) { + numSeries := 1000000 + _, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) { + l := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)) + ref, err := app.Append(0, l, int64(i+1), 0) + if err != nil { + b.Fatal(err) + } + _, err = app.Append(ref, l, int64(i), 1) // Out of order sample + if err != nil { + b.Fatal(err) + } + }) + + b.Run("Head", func(b *testing.B) { + benchmarkSelect(b, db, numSeries, false) }) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index ffdf8dc028..50525f65f4 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3169,12 +3169,11 @@ func BenchmarkQueries(b *testing.B) { qHead, err := NewBlockQuerier(NewRangeHead(head, 1, nSamples), 1, nSamples) require.NoError(b, err) - qOOOHead, err := NewBlockQuerier(NewOOORangeHead(head, 1, nSamples, 0), 1, nSamples) - require.NoError(b, err) + isoState := head.oooIso.TrackReadAfter(0) + qOOOHead := NewHeadAndOOOQuerier(1, nSamples, head, isoState, qHead) queryTypes = append(queryTypes, qt{ - fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), - storage.NewMergeQuerier([]storage.Querier{qHead, qOOOHead}, nil, storage.ChainedSeriesMerge), + fmt.Sprintf("_Head_oooPercent:%d", oooPercentage), qOOOHead, }) }