*: Consistent Error/Warning handling for SeriesSet iterator: Allowing Async Select (#7251)

* Add errors and Warnings to SeriesSet

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Change Querier interface and refactor accordingly

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Refactor promql/engine to propagate warnings at eval stage

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Make sure all the series from all Selects are pre-advanced

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Separate merge series sets

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Clean

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Refactor merge querier failure handling

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Refactored and simplified fanout with improvements from incoming chunk iterator PRs.

* Secondary logic is hidden, instead of weird failed series set logic we had.
* Fanout is well commented
* Fanout closing record all errors
* MergeQuerier improved API (clearer)
* deferredGenericMergeSeriesSet is not needed as we return no samples anyway for failed series sets (next = false).

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fix formatting

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix CI issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Added final tests for error handling.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Brian's comments.

* Moved hints in populate to be allocated only when needed.
* Used sync.Once in secondary Querier to achieve all-or-nothing partial response logic.
* Select after first Next is done will panic.

NOTE: in lazySeriesSet in theory we could just panic, I think however we can
totally just return error, it will panic in expand anyway.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Utilize errWithWarnings

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix recently introduced expansion issue

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add tests for secondary querier error handling

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Implement lazy merge

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add name to test cases

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Reorganize

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review comments

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review comments

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove redundant warnings

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix rebase mistake

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Kemal Akkoyun 2020-06-09 18:57:31 +02:00 committed by GitHub
parent aff32443f9
commit 66dfb951c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1113 additions and 616 deletions

View file

@ -29,7 +29,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -418,7 +418,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, w storage.Warnings, err error) {
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storage.Warnings, err error) {
ng.metrics.currentQueries.Inc()
defer ng.metrics.currentQueries.Dec()
@ -517,13 +517,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
}
defer querier.Close()
warnings, err := ng.populateSeries(ctxPrepare, querier, s)
ng.populateSeries(querier, s)
prepareSpanTimer.Finish()
if err != nil {
return nil, warnings, err
}
evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
// Instant evaluation. This is executed as a range evaluation with one step.
if s.Start == s.End && s.Interval == 0 {
@ -539,7 +535,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
lookbackDelta: ng.lookbackDelta,
}
val, err := evaluator.Eval(s.Expr)
val, warnings, err := evaluator.Eval(s.Expr)
if err != nil {
return nil, warnings, err
}
@ -588,7 +584,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
logger: ng.logger,
lookbackDelta: ng.lookbackDelta,
}
val, err := evaluator.Eval(s.Expr)
val, warnings, err := evaluator.Eval(s.Expr)
if err != nil {
return nil, warnings, err
}
@ -649,35 +645,29 @@ func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
return s.Start.Add(-maxOffset)
}
func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) (storage.Warnings, error) {
var (
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
evalRange time.Duration
warnings storage.Warnings
err error
)
func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time.Duration
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
var set storage.SeriesSet
var wrn storage.Warnings
hints := &storage.SelectHints{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Step: durationToInt64Millis(s.Interval),
}
// We need to make sure we select the timerange selected by the subquery.
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
// we can optimise it by separating out the range and offsets, and subtracting the offsets
// from end also.
subqOffset := ng.cumulativeSubqueryOffset(path)
offsetMilliseconds := durationMilliseconds(subqOffset)
hints.Start = hints.Start - offsetMilliseconds
switch n := node.(type) {
case *parser.VectorSelector:
hints := &storage.SelectHints{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Step: durationToInt64Millis(s.Interval),
}
// We need to make sure we select the timerange selected by the subquery.
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
// we can optimise it by separating out the range and offsets, and subtracting the offsets
// from end also.
subqOffset := ng.cumulativeSubqueryOffset(path)
offsetMilliseconds := durationMilliseconds(subqOffset)
hints.Start = hints.Start - offsetMilliseconds
if evalRange == 0 {
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
} else {
@ -696,20 +686,12 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
hints.End = hints.End - offsetMilliseconds
}
set, wrn, err = querier.Select(false, hints, n.LabelMatchers...)
warnings = append(warnings, wrn...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return err
}
n.UnexpandedSeriesSet = set
n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...)
case *parser.MatrixSelector:
evalRange = n.Range
}
return nil
})
return warnings, err
}
// extractFuncFromPath walks up the path and searches for the first instance of
@ -743,34 +725,40 @@ func extractGroupsFromPath(p []parser.Node) (bool, []string) {
return false, nil
}
func checkForSeriesSetExpansion(ctx context.Context, expr parser.Expr) {
func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (storage.Warnings, error) {
switch e := expr.(type) {
case *parser.MatrixSelector:
checkForSeriesSetExpansion(ctx, e.VectorSelector)
return checkAndExpandSeriesSet(ctx, e.VectorSelector)
case *parser.VectorSelector:
if e.Series == nil {
series, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
if err != nil {
panic(err)
} else {
e.Series = series
}
if e.Series != nil {
return nil, nil
}
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
e.Series = series
return ws, err
}
return nil, nil
}
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) {
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, ws storage.Warnings, err error) {
for it.Next() {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
default:
}
res = append(res, it.At())
}
return res, it.Err()
return res, it.Warnings(), it.Err()
}
type errWithWarnings struct {
err error
warnings storage.Warnings
}
func (e errWithWarnings) Error() string { return e.err.Error() }
// An evaluator evaluates given expressions over given fixed timestamps. It
// is attached to an engine through which it connects to a querier and reports
// errors. On timeout or cancellation of its context it terminates.
@ -799,26 +787,33 @@ func (ev *evaluator) error(err error) {
}
// recover is the handler that turns panics into returns from the top level of evaluation.
func (ev *evaluator) recover(errp *error) {
func (ev *evaluator) recover(ws *storage.Warnings, errp *error) {
e := recover()
if e == nil {
return
}
if err, ok := e.(runtime.Error); ok {
switch err := e.(type) {
case runtime.Error:
// Print the stack trace but do not inhibit the running application.
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf))
*errp = errors.Wrap(err, "unexpected error")
} else {
case errWithWarnings:
*errp = err.err
*ws = append(*ws, err.warnings...)
default:
*errp = e.(error)
}
}
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, err error) {
defer ev.recover(&err)
return ev.eval(expr), nil
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings, err error) {
defer ev.recover(&ws, &err)
v, ws = ev.eval(expr)
return v, ws, nil
}
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
@ -884,17 +879,20 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L
// the given function with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// function call results.
func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, exprs ...parser.Expr) Matrix {
func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs))
originalNumSamples := ev.currentSamples
var warnings storage.Warnings
for i, e := range exprs {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
matrixes[i] = ev.eval(e).(Matrix)
val, ws := ev.eval(e)
warnings = append(warnings, ws...)
matrixes[i] = val.(Matrix)
// Keep a copy of the original point slices so that they
// can be returned to the pool.
@ -946,11 +944,12 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
}
// Make the function call.
enh.ts = ts
result := f(args, enh)
result, ws := f(args, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
enh.out = result[:0] // Reuse result vector.
warnings = append(warnings, ws...)
ev.currentSamples += len(result)
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
@ -969,7 +968,7 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}}
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
return mat
return mat, warnings
}
// Add samples in output vector to output series.
@ -1001,29 +1000,30 @@ func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) Vector, e
mat = append(mat, ss)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
return mat
return mat, warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) *parser.MatrixSelector {
val := ev.eval(subq).(Matrix)
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, storage.Warnings) {
val, ws := ev.eval(subq)
mat := val.(Matrix)
vs := &parser.VectorSelector{
Offset: subq.Offset,
Series: make([]storage.Series, 0, len(val)),
Series: make([]storage.Series, 0, len(mat)),
}
ms := &parser.MatrixSelector{
Range: subq.Range,
VectorSelector: vs,
}
for _, s := range val {
for _, s := range mat {
vs.Series = append(vs.Series, NewStorageSeries(s))
}
return ms
return ms, ws
}
// eval evaluates the given expression as the given AST expression node requires.
func (ev *evaluator) eval(expr parser.Expr) parser.Value {
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
// This is the top-level evaluation method.
// Thus, we check for timeout/cancellation here.
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
@ -1035,16 +1035,16 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
case *parser.AggregateExpr:
unwrapParenExpr(&e.Param)
if s, ok := e.Param.(*parser.StringLiteral); ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil
}, e.Expr)
}
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
var param float64
if e.Param != nil {
param = v[0].(Vector)[0].V
}
return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh)
return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh), nil
}, e.Param, e.Expr)
case *parser.Call:
@ -1056,15 +1056,19 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
// a vector selector.
vs, ok := e.Args[0].(*parser.VectorSelector)
if ok {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return call([]parser.Value{ev.vectorSelector(vs, enh.ts)}, e.Args, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
val, ws := ev.vectorSelector(vs, enh.ts)
return call([]parser.Value{val}, e.Args, enh), ws
})
}
}
// Check if the function has a matrix argument.
var matrixArgIndex int
var matrixArg bool
var (
matrixArgIndex int
matrixArg bool
warnings storage.Warnings
)
for i := range e.Args {
unwrapParenExpr(&e.Args[i])
a := e.Args[i]
@ -1078,14 +1082,16 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
matrixArgIndex = i
matrixArg = true
// Replacing parser.SubqueryExpr with parser.MatrixSelector.
e.Args[i] = ev.evalSubquery(subq)
val, ws := ev.evalSubquery(subq)
e.Args[i] = val
warnings = append(warnings, ws...)
break
}
}
if !matrixArg {
// Does not have a matrix argument.
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return call(v, e.Args, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return call(v, e.Args, enh), warnings
}, e.Args...)
}
@ -1095,16 +1101,22 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
otherInArgs := make([]Vector, len(e.Args))
for i, e := range e.Args {
if i != matrixArgIndex {
otherArgs[i] = ev.eval(e).(Matrix)
val, ws := ev.eval(e)
otherArgs[i] = val.(Matrix)
otherInArgs[i] = Vector{Sample{}}
inArgs[i] = otherInArgs[i]
warnings = append(warnings, ws...)
}
}
sel := e.Args[matrixArgIndex].(*parser.MatrixSelector)
selVS := sel.VectorSelector.(*parser.VectorSelector)
checkForSeriesSetExpansion(ev.ctx, sel)
ws, err := checkAndExpandSeriesSet(ev.ctx, sel)
warnings = append(warnings, ws...)
if err != nil {
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), warnings})
}
mat := make(Matrix, 0, len(selVS.Series)) // Output matrix.
offset := durationMilliseconds(selVS.Offset)
selRange := durationMilliseconds(sel.Range)
@ -1182,7 +1194,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
// Iterate once to look for a complete series.
for _, s := range mat {
if len(s.Points) == steps {
return Matrix{}
return Matrix{}, warnings
}
}
@ -1193,7 +1205,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
found[p.T] = struct{}{}
}
if i > 0 && len(found) == steps {
return Matrix{}
return Matrix{}, warnings
}
}
@ -1209,20 +1221,21 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
Metric: createLabelsForAbsentFunction(e.Args[0]),
Points: newp,
},
}
}, warnings
}
if mat.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
return mat
return mat, warnings
case *parser.ParenExpr:
return ev.eval(e.Expr)
case *parser.UnaryExpr:
mat := ev.eval(e.Expr).(Matrix)
val, ws := ev.eval(e.Expr)
mat := val.(Matrix)
if e.Op == parser.SUB {
for i := range mat {
mat[i].Metric = dropMetricName(mat[i].Metric)
@ -1234,53 +1247,56 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
ev.errorf("vector cannot contain metrics with the same labelset")
}
}
return mat
return mat, ws
case *parser.BinaryExpr:
switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V)
return append(enh.out, Sample{Point: Point{V: val}})
return append(enh.out, Sample{Point: Point{V: val}}), nil
}, e.LHS, e.RHS)
case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector:
switch e.Op {
case parser.LAND:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
case parser.LOR:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
case parser.LUNLESS:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil
}, e.LHS, e.RHS)
default:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
}
case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh)
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh), nil
}, e.LHS, e.RHS)
}
case *parser.NumberLiteral:
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) Vector {
return append(enh.out, Sample{Point: Point{V: e.Val}})
return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
return append(enh.out, Sample{Point: Point{V: e.Val}}), nil
})
case *parser.VectorSelector:
checkForSeriesSetExpansion(ev.ctx, e)
ws, err := checkAndExpandSeriesSet(ev.ctx, e)
if err != nil {
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws})
}
mat := make(Matrix, 0, len(e.Series))
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
for i, s := range e.Series {
@ -1307,9 +1323,8 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
} else {
putPointSlice(ss.Points)
}
}
return mat
return mat, ws
case *parser.MatrixSelector:
if ev.startTimestamp != ev.endTimestamp {
@ -1342,11 +1357,11 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
newEv.startTimestamp += newEv.interval
}
res := newEv.eval(e.Expr)
res, ws := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
return res
return res, ws
case *parser.StringLiteral:
return String{V: e.Val, T: ev.startTimestamp}
return String{V: e.Val, T: ev.startTimestamp}, nil
}
panic(errors.Errorf("unhandled expression of type: %T", expr))
@ -1357,13 +1372,12 @@ func durationToInt64Millis(d time.Duration) int64 {
}
// vectorSelector evaluates a *parser.VectorSelector expression.
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) Vector {
checkForSeriesSetExpansion(ev.ctx, node)
var (
vec = make(Vector, 0, len(node.Series))
)
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) {
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
if err != nil {
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws})
}
vec := make(Vector, 0, len(node.Series))
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
for i, s := range node.Series {
it.Reset(s.Iterator())
@ -1381,7 +1395,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) Vecto
ev.error(ErrTooManySamples(env))
}
}
return vec
return vec, ws
}
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series.
@ -1429,21 +1443,23 @@ func putPointSlice(p []Point) {
}
// matrixSelector evaluates a *parser.MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
checkForSeriesSetExpansion(ev.ctx, node)
vs := node.VectorSelector.(*parser.VectorSelector)
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storage.Warnings) {
var (
vs = node.VectorSelector.(*parser.VectorSelector)
offset = durationMilliseconds(vs.Offset)
maxt = ev.startTimestamp - offset
mint = maxt - durationMilliseconds(node.Range)
matrix = make(Matrix, 0, len(vs.Series))
it = storage.NewBuffer(durationMilliseconds(node.Range))
)
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
if err != nil {
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws})
}
it := storage.NewBuffer(durationMilliseconds(node.Range))
series := vs.Series
for i, s := range series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
@ -1461,7 +1477,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
putPointSlice(ss.Points)
}
}
return matrix
return matrix, ws
}
// matrixIterSlice populates a matrix vector covering the requested range for a

View file

@ -172,8 +172,8 @@ type errQuerier struct {
err error
}
func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err
func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return errSeriesSet{err: q.err}
}
func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
@ -184,9 +184,10 @@ type errSeriesSet struct {
err error
}
func (errSeriesSet) Next() bool { return false }
func (errSeriesSet) At() storage.Series { return nil }
func (e errSeriesSet) Err() error { return e.err }
func (errSeriesSet) Next() bool { return false }
func (errSeriesSet) At() storage.Series { return nil }
func (e errSeriesSet) Err() error { return e.err }
func (e errSeriesSet) Warnings() storage.Warnings { return nil }
func TestQueryError(t *testing.T) {
opts := EngineOpts{
@ -208,14 +209,14 @@ func TestQueryError(t *testing.T) {
res := vectorQuery.Exec(ctx)
testutil.NotOk(t, res.Err, "expected error on failed select but got none")
testutil.Equals(t, errStorage, res.Err)
testutil.Assert(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0))
testutil.Ok(t, err)
res = matrixQuery.Exec(ctx)
testutil.NotOk(t, res.Err, "expected error on failed select but got none")
testutil.Equals(t, errStorage, res.Err)
testutil.Assert(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
}
// hintCheckerQuerier implements storage.Querier which checks the start and end times
@ -231,7 +232,7 @@ type hintCheckerQuerier struct {
t *testing.T
}
func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet {
testutil.Equals(q.t, q.start, sp.Start)
testutil.Equals(q.t, q.end, sp.End)
testutil.Equals(q.t, q.grouping, sp.Grouping)
@ -239,7 +240,7 @@ func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*label
testutil.Equals(q.t, q.selRange, sp.Range)
testutil.Equals(q.t, q.function, sp.Func)
return errSeriesSet{err: nil}, nil, nil
return errSeriesSet{err: nil}
}
func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
return nil, nil, nil
@ -594,9 +595,8 @@ load 10s
}
testutil.Ok(t, res.Err)
testutil.Equals(t, c.Result, res.Value)
testutil.Equals(t, c.Result, res.Value, "query %q failed", c.Query)
}
}
func TestMaxQuerySamples(t *testing.T) {
@ -826,7 +826,7 @@ load 10s
res := qry.Exec(test.Context())
testutil.Equals(t, c.Result.Err, res.Err)
testutil.Equals(t, c.Result.Value, res.Value)
testutil.Equals(t, c.Result.Value, res.Value, "query %q failed", c.Query)
}
}
@ -834,7 +834,7 @@ func TestRecoverEvaluatorRuntime(t *testing.T) {
ev := &evaluator{logger: log.NewNopLogger()}
var err error
defer ev.recover(&err)
defer ev.recover(nil, &err)
// Cause a runtime panic.
var a []int
@ -857,7 +857,31 @@ func TestRecoverEvaluatorError(t *testing.T) {
t.Fatalf("wrong error message: %q, expected %q", err, e)
}
}()
defer ev.recover(&err)
defer ev.recover(nil, &err)
panic(e)
}
func TestRecoverEvaluatorErrorWithWarnings(t *testing.T) {
ev := &evaluator{logger: log.NewNopLogger()}
var err error
var ws storage.Warnings
warnings := storage.Warnings{errors.New("custom warning")}
e := errWithWarnings{
err: errors.New("custom error"),
warnings: warnings,
}
defer func() {
if err.Error() != e.Error() {
t.Fatalf("wrong error message: %q, expected %q", err, e)
}
if len(ws) != len(warnings) && ws[0] != warnings[0] {
t.Fatalf("wrong warning message: %q, expected %q", ws[0], warnings[0])
}
}()
defer ev.recover(&ws, &err)
panic(e)
}

View file

@ -133,8 +133,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
}
// Get the series for the matcher.
ss, _, err := querier.Select(false, nil, matchers...)
testutil.Ok(t, err)
ss := querier.Select(false, nil, matchers...)
testutil.Assert(t, ss.Next(), "")
storageSeries := ss.At()
testutil.Assert(t, !ss.Next(), "Expecting only 1 series")

View file

@ -707,12 +707,7 @@ func (g *Group) RestoreForState(ts time.Time) {
matchers = append(matchers, mt)
}
sset, err, _ := q.Select(false, nil, matchers...)
if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)
return
}
sset := q.Select(false, nil, matchers...)
seriesFound := false
var s storage.Series
@ -727,6 +722,17 @@ func (g *Group) RestoreForState(ts time.Time) {
}
}
if err := sset.Err(); err != nil {
// Querier Warnings are ignored. We do not care unless we have an error.
level.Error(g.logger).Log(
"msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(),
"stage", "Select",
"err", err,
)
return
}
if !seriesFound {
return
}

View file

@ -563,9 +563,7 @@ func TestStaleness(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err)
set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err)
set := querier.Select(false, nil, matcher)
samples, err := readSeriesSet(set)
testutil.Ok(t, err)
@ -686,9 +684,7 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1")
testutil.Ok(t, err)
set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err)
set := querier.Select(false, nil, matcher)
samples, err := readSeriesSet(set)
testutil.Ok(t, err)
@ -1107,9 +1103,7 @@ func countStaleNaN(t *testing.T, st storage.Storage) int {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
testutil.Ok(t, err)
set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err)
set := querier.Select(false, nil, matcher)
samples, err := readSeriesSet(set)
testutil.Ok(t, err)

View file

@ -1617,9 +1617,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Equals(t, false, series.Next(), "series found in tsdb")
testutil.Ok(t, series.Err())
// We add a good metric to check that it is recorded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
@ -1627,9 +1627,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err)
series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Ok(t, series.Err())
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
}
@ -1663,9 +1663,9 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Equals(t, false, series.Next(), "series found in tsdb")
testutil.Ok(t, series.Err())
}
func TestReusableConfig(t *testing.T) {

View file

@ -16,14 +16,15 @@ package storage
import (
"container/heap"
"context"
"reflect"
"sort"
"strings"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -37,8 +38,15 @@ type fanout struct {
secondaries []Storage
}
// NewFanout returns a new fan-out Storage, which proxies reads and writes
// NewFanout returns a new fanout Storage, which proxies reads and writes
// through to multiple underlying storages.
//
// The difference between primary and secondary Storage is only for read (Querier) path and it goes as follows:
// * If the primary querier returns an error, then any of the Querier operations will fail.
// * If any secondary querier returns an error the result from that queries is discarded. The overall operation will succeed,
// and the error from the secondary querier will be returned as a warning.
//
// NOTE: In the case of Prometheus, it treats all remote storages as secondary / best effort.
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
return &fanout{
logger: logger,
@ -56,8 +64,8 @@ func (f *fanout) StartTime() (int64, error) {
return int64(model.Latest), err
}
for _, storage := range f.secondaries {
t, err := storage.StartTime()
for _, s := range f.secondaries {
t, err := s.StartTime()
if err != nil {
return int64(model.Latest), err
}
@ -69,29 +77,27 @@ func (f *fanout) StartTime() (int64, error) {
}
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
queriers := make([]Querier, 0, 1+len(f.secondaries))
// Add primary querier.
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
primary, err := f.primary.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, primaryQuerier)
// Add secondary queriers.
secondaries := make([]Querier, 0, len(f.secondaries))
for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt)
if err != nil {
for _, q := range queriers {
// TODO(bwplotka): Log error.
_ = q.Close()
// Close already open Queriers, append potential errors to returned error.
errs := tsdb_errors.MultiError{err}
errs.Add(primary.Close())
for _, q := range secondaries {
errs.Add(q.Close())
}
return nil, err
return nil, errs.Err()
}
queriers = append(queriers, querier)
}
return NewMergeQuerier(primaryQuerier, queriers, ChainedSeriesMerge), nil
secondaries = append(secondaries, querier)
}
return NewMergeQuerier(primary, secondaries, ChainedSeriesMerge), nil
}
func (f *fanout) Appender() Appender {
@ -109,18 +115,12 @@ func (f *fanout) Appender() Appender {
// Close closes the storage and all its underlying resources.
func (f *fanout) Close() error {
if err := f.primary.Close(); err != nil {
return err
errs := tsdb_errors.MultiError{}
errs.Add(f.primary.Close())
for _, s := range f.secondaries {
errs.Add(s.Close())
}
// TODO return multiple errors?
var lastErr error
for _, storage := range f.secondaries {
if err := storage.Close(); err != nil {
lastErr = err
}
}
return lastErr
return errs.Err()
}
// fanoutAppender implements Appender.
@ -188,153 +188,138 @@ func (f *fanoutAppender) Rollback() (err error) {
}
type mergeGenericQuerier struct {
mergeFunc genericSeriesMergeFunc
queriers []genericQuerier
primaryQuerier genericQuerier
queriers []genericQuerier
failedQueriers map[genericQuerier]struct{}
setQuerierMap map[genericSeriesSet]genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
}
// NewMergeQuerier returns a new Querier that merges results of chkQuerierSeries queriers.
// NewMergeQuerier will return NoopQuerier if no queriers are passed to it
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
// The difference between primary and secondary is as follows: f the primaryQuerier returns an error, query fails.
// For secondaries it just return warnings.
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier, mergeFunc VerticalSeriesMergeFunc) Querier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
// NewMergeQuerier returns a new Querier that merges results of given primary and slice of secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
func NewMergeQuerier(primary Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(secondaries)+1)
if primary != nil {
queriers = append(queriers, newGenericQuerierFrom(primary))
}
for _, querier := range secondaries {
if _, ok := querier.(noopQuerier); !ok && querier != nil {
filtered = append(filtered, newGenericQuerierFrom(querier))
queriers = append(queriers, newSecondaryQuerierFrom(querier))
}
}
if len(filtered) == 0 {
return primaryQuerier
}
if primaryQuerier == nil && len(filtered) == 1 {
return &querierAdapter{filtered[0]}
}
return &querierAdapter{&mergeGenericQuerier{
mergeFunc: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge,
primaryQuerier: newGenericQuerierFrom(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
}}
}
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of chkQuerierSeries chunk queriers.
// NewMergeChunkQuerier will return NoopChunkQuerier if no chunk queriers are passed to it,
// and will filter NoopQuerieNoopChunkQuerierrs from its arguments, in order to reduce overhead
// when only one chunk querier is passed.
func NewMergeChunkQuerier(primaryQuerier ChunkQuerier, queriers []ChunkQuerier, merger VerticalChunkSeriesMergerFunc) ChunkQuerier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of given primary and slice of secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergerFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(secondaries)+1)
if primary != nil {
queriers = append(queriers, newGenericQuerierFromChunk(primary))
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
filtered = append(filtered, newGenericQuerierFromChunk(querier))
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
}
if len(filtered) == 0 {
return primaryQuerier
}
if primaryQuerier == nil && len(filtered) == 1 {
return &chunkQuerierAdapter{filtered[0]}
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFunc: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge,
primaryQuerier: newGenericQuerierFromChunk(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: mergeFn}).Merge,
queriers: queriers,
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var (
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
warnings Warnings
priErr error
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
type queryResult struct {
qr genericQuerier
set genericSeriesSet
wrn Warnings
selectError error
}
queryResultChan := make(chan *queryResult)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
wg.Add(1)
go func(qr genericQuerier) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
set, wrn, err := qr.Select(true, hints, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
seriesSetChan <- qr.Select(true, hints, matchers...)
}(querier)
}
for i := 0; i < len(q.queriers); i++ {
qryResult := <-queryResultChan
q.setQuerierMap[qryResult.set] = qryResult.qr
if qryResult.wrn != nil {
warnings = append(warnings, qryResult.wrn...)
go func() {
wg.Wait()
close(seriesSetChan)
}()
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazySeriesSet{create: create(seriesSets, q.mergeFn)}
}
func create(seriesSets []genericSeriesSet, mergeFn genericSeriesMergeFunc) func() (genericSeriesSet, bool) {
// Returned function gets called with the first call to Next().
return func() (genericSeriesSet, bool) {
if len(seriesSets) == 1 {
return seriesSets[0], seriesSets[0].Next()
}
if qryResult.selectError != nil {
q.failedQueriers[qryResult.qr] = struct{}{}
// If the error source isn't the primary querier, return the error as a warning and continue.
if !reflect.DeepEqual(qryResult.qr, q.primaryQuerier) {
warnings = append(warnings, qryResult.selectError)
} else {
priErr = qryResult.selectError
var h genericSeriesSetHeap
for _, set := range seriesSets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
continue
}
// When primary fails ignore results from secondaries.
// Only the primary querier returns error.
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}, false
}
continue
}
seriesSets = append(seriesSets, qryResult.set)
set := &genericMergeSeriesSet{
mergeFn: mergeFn,
sets: seriesSets,
heap: h,
}
return set, set.Next()
}
if priErr != nil {
return nil, nil, priErr
}
return newGenericMergeSeriesSet(seriesSets, q, q.mergeFunc), warnings, nil
}
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
var results [][]string
var warnings Warnings
var (
results [][]string
warnings Warnings
)
for _, querier := range q.queriers {
values, wrn, err := querier.LabelValues(name)
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings = append(warnings, wrn...)
}
if err != nil {
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primary querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
return nil, nil, err
}
return nil, nil, errors.Wrapf(err, "LabelValues() from Querier for label %s", name)
}
results = append(results, values)
}
return mergeStringSlices(results), warnings, nil
}
func (q *mergeGenericQuerier) IsFailedSet(set genericSeriesSet) bool {
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
return isFailedQuerier
}
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
@ -381,37 +366,31 @@ func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames()
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings = append(warnings, wrn...)
}
if err != nil {
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primaryQuerier querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
}
return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
if len(labelNamesMap) == 0 {
return nil, warnings, nil
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, warnings, nil
}
// Close releases the resources of the Querier.
func (q *mergeGenericQuerier) Close() error {
var errs tsdb_errors.MultiError
errs := tsdb_errors.MultiError{}
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
errs.Add(err)
@ -420,18 +399,6 @@ func (q *mergeGenericQuerier) Close() error {
return errs.Err()
}
// genericMergeSeriesSet implements genericSeriesSet
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
querier *mergeGenericQuerier
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
@ -447,7 +414,7 @@ func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesS
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)}
}
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets.
@ -457,21 +424,30 @@ func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMer
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFn genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the chkQuerierSeries series sets when iterating.
// Each chkQuerierSeries series set must return its series in labels order, otherwise
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Argument 'querier' is optional and can be nil. Pass Querier if you want to retry query in case of failing series set.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuerier, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
// Overlapping cases are merged using provided mergeFn.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFn genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// Sets need to be pre-advanced, so we can introspect the label of the
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _, set := range sets {
@ -483,26 +459,25 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuer
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
heap: h,
sets: sets,
querier: querier,
mergeFn: mergeFn,
sets: sets,
heap: h,
}
}
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If a remote querier fails, we discard all series sets from that querier.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
@ -512,9 +487,6 @@ func (c *genericMergeSeriesSet) Next() bool {
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(genericSeriesSet)
if c.querier != nil && c.querier.IsFailedSet(set) {
continue
}
c.currentSets = append(c.currentSets, set)
}
@ -535,7 +507,7 @@ func (c *genericMergeSeriesSet) At() Labels {
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return c.mergeFunc(series...)
return c.mergeFn(series...)
}
func (c *genericMergeSeriesSet) Err() error {
@ -547,6 +519,14 @@ func (c *genericMergeSeriesSet) Err() error {
return nil
}
func (c *genericMergeSeriesSet) Warnings() Warnings {
var ws Warnings
for _, set := range c.sets {
ws = append(ws, set.Warnings()...)
}
return ws
}
type genericSeriesSetHeap []genericSeriesSet
func (h genericSeriesSetHeap) Len() int { return len(h) }

View file

@ -16,14 +16,14 @@ package storage
import (
"context"
"errors"
"testing"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
"testing"
)
func TestSelectSorted(t *testing.T) {
@ -79,8 +79,7 @@ func TestSelectSorted(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)
seriesSet, _, err := querier.Select(true, nil, matcher)
testutil.Ok(t, err)
seriesSet := querier.Select(true, nil, matcher)
result := make(map[int64]float64)
var labelsResult labels.Labels
@ -95,6 +94,7 @@ func TestSelectSorted(t *testing.T) {
}
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, labelsResult, outputLabel)
testutil.Equals(t, inputTotalSize, len(result))
}
@ -131,19 +131,12 @@ func TestFanoutErrors(t *testing.T) {
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss, warnings, err := querier.Select(true, nil, matcher)
testutil.Equals(t, tc.err, err)
testutil.Equals(t, tc.warnings, warnings)
// Only test series iteration if there are no errors.
if err != nil {
continue
}
ss := querier.Select(true, nil, matcher)
for ss.Next() {
ss.At()
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, tc.err, ss.Err())
testutil.Equals(t, tc.warnings, ss.Warnings())
}
}
@ -169,8 +162,8 @@ func (errStorage) Close() error {
type errQuerier struct{}
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return nil, nil, errSelect
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errSelect)
}
func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {

View file

@ -17,8 +17,11 @@ import (
"fmt"
"math"
"sort"
"sync"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@ -56,24 +59,37 @@ func TestMergeTwoStringSlices(t *testing.T) {
func TestMergeQuerierWithChainMerger(t *testing.T) {
for _, tc := range []struct {
name string
querierSeries [][]Series
extraQueriers []Querier
name string
primaryQuerierSeries []Series
querierSeries [][]Series
extraQueriers []Querier
expected SeriesSet
}{
{
name: "1 querier with no series",
name: "one primary querier with no series",
primaryQuerierSeries: []Series{},
expected: NewMockSeriesSet(),
},
{
name: "one secondary querier with no series",
querierSeries: [][]Series{{}},
expected: NewMockSeriesSet(),
},
{
name: "many queriers with no series",
name: "many secondary queriers with no series",
querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockSeriesSet(),
},
{
name: "1 querier, two series",
name: "mix of queriers with no series",
primaryQuerierSeries: []Series{},
querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockSeriesSet(),
},
// Test rest of cases on secondary queriers as the different between primary vs secondary is just error handling.
{
name: "one querier, two series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
@ -84,7 +100,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "2 queriers, 1 different series each",
name: "two queriers, one different series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
}, {
@ -96,7 +112,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "2 time unsorted queriers, 2 series each",
name: "two time unsorted queriers, two series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
@ -116,7 +132,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "5 queriers, only 2 queriers have 2 time unsorted series each",
name: "five queriers, only two queriers have two time unsorted series each",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
@ -136,7 +152,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "2 queriers, only 2 queriers have 2 time unsorted series each, with 3 noop and one nil querier together",
name: "two queriers, only two queriers have two time unsorted series each, with 3 noop and one nil querier together",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
@ -157,7 +173,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "2 queriers, with 2 series, one is overlapping",
name: "two queriers, with two series, one is overlapping",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
@ -177,7 +193,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
),
},
{
name: "2 queries, one with NaN samples series",
name: "two queries, one with NaN samples series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
@ -189,13 +205,17 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var p Querier
if tc.primaryQuerierSeries != nil {
p = &mockQuerier{toReturn: tc.primaryQuerierSeries}
}
var qs []Querier
for _, in := range tc.querierSeries {
qs = append(qs, &mockQuerier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
mergedQuerier, _, _ := NewMergeQuerier(qs[0], qs, ChainedSeriesMerge).Select(false, nil)
mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(false, nil)
// Get all merged series upfront to make sure there are no incorrectly retained shared
// buffers causing bugs.
@ -222,22 +242,35 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
for _, tc := range []struct {
name string
chkQuerierSeries [][]ChunkSeries
extraQueriers []ChunkQuerier
name string
primaryChkQuerierSeries []ChunkSeries
chkQuerierSeries [][]ChunkSeries
extraQueriers []ChunkQuerier
expected ChunkSeriesSet
}{
{
name: "one querier with no series",
name: "one primary querier with no series",
primaryChkQuerierSeries: []ChunkSeries{},
expected: NewMockChunkSeriesSet(),
},
{
name: "one secondary querier with no series",
chkQuerierSeries: [][]ChunkSeries{{}},
expected: NewMockChunkSeriesSet(),
},
{
name: "many queriers with no series",
name: "many secondary queriers with no series",
chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockChunkSeriesSet(),
},
{
name: "mix of queriers with no series",
primaryChkQuerierSeries: []ChunkSeries{},
chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockChunkSeriesSet(),
},
// Test rest of cases on secondary queriers as the different between primary vs secondary is just error handling.
{
name: "one querier, two series",
chkQuerierSeries: [][]ChunkSeries{{
@ -347,13 +380,19 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var p ChunkQuerier
if tc.primaryChkQuerierSeries != nil {
p = &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries}
}
var qs []ChunkQuerier
for _, in := range tc.chkQuerierSeries {
qs = append(qs, &mockChunkQurier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeChunkQuerier(qs[0], qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil)
// TODO(bwplotka): Add case of overlap to check if those are handled well.
merged := NewMergeChunkQuerier(p, qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
@ -384,14 +423,14 @@ func (a seriesByLabel) Len() int { return len(a) }
func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (SeriesSet, Warnings, error) {
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
cpy := make([]Series, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(seriesByLabel(cpy))
}
return NewMockSeriesSet(cpy...), nil, nil
return NewMockSeriesSet(cpy...)
}
type mockChunkQurier struct {
@ -408,14 +447,14 @@ func (a chunkSeriesByLabel) Less(i, j int) bool {
return labels.Compare(a[i].Labels(), a[j].Labels()) < 0
}
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet {
cpy := make([]ChunkSeries, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(chunkSeriesByLabel(cpy))
}
return NewMockChunkSeriesSet(cpy...), nil, nil
return NewMockChunkSeriesSet(cpy...)
}
type mockSeriesSet struct {
@ -439,6 +478,8 @@ func (m *mockSeriesSet) At() Series { return m.series[m.idx] }
func (m *mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Warnings() Warnings { return nil }
type mockChunkSeriesSet struct {
idx int
series []ChunkSeries
@ -460,6 +501,8 @@ func (m *mockChunkSeriesSet) At() ChunkSeries { return m.series[m.idx] }
func (m *mockChunkSeriesSet) Err() error { return nil }
func (m *mockChunkSeriesSet) Warnings() Warnings { return nil }
func TestChainSampleIterator(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
@ -571,7 +614,7 @@ func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
@ -603,3 +646,260 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
})
}
}
type mockGenericQuerier struct {
mtx sync.Mutex
closed bool
labelNamesCalls int
labelNamesRequested []string
sortedSeriesRequested []bool
resp []string
warnings Warnings
err error
}
func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
m.mtx.Lock()
m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
m.mtx.Unlock()
return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
}
func (m *mockGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, name)
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) LabelNames() ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesCalls++
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) Close() error {
m.closed = true
return nil
}
type mockGenericSeriesSet struct {
resp []string
warnings Warnings
err error
curr int
}
func (m *mockGenericSeriesSet) Next() bool {
if m.err != nil {
return false
}
if m.curr >= len(m.resp) {
return false
}
m.curr++
return true
}
func (m *mockGenericSeriesSet) Err() error { return m.err }
func (m *mockGenericSeriesSet) Warnings() Warnings { return m.warnings }
func (m *mockGenericSeriesSet) At() Labels {
return mockLabels(m.resp[m.curr-1])
}
type mockLabels string
func (l mockLabels) Labels() labels.Labels {
return labels.FromStrings("test", string(l))
}
func unwrapMockGenericQuerier(t *testing.T, qr genericQuerier) *mockGenericQuerier {
m, ok := qr.(*mockGenericQuerier)
if !ok {
s, ok := qr.(*secondaryQuerier)
testutil.Assert(t, ok, "expected secondaryQuerier got something else")
m, ok = s.genericQuerier.(*mockGenericQuerier)
testutil.Assert(t, ok, "expected mockGenericQuerier got something else")
}
return m
}
func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
var (
errStorage = errors.New("storage error")
warnStorage = errors.New("storage warning")
)
for _, tcase := range []struct {
name string
queriers []genericQuerier
expectedSelectsSeries []labels.Labels
expectedLabels []string
expectedWarnings [3]Warnings
expectedErrs [3]error
}{
{},
{
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
},
{
name: "multiple successful primary queriers",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&mockGenericQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"a", "b", "c"},
},
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"a", "b", "c"},
},
{
name: "one successful primary querier with empty response and successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"b", "c"},
},
{
name: "one failed primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{warnings: nil, err: errStorage},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
},
expectedLabels: []string{"a"},
expectedWarnings: [3]Warnings{
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
},
},
{
name: "successful queriers with warnings",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: []error{warnStorage}, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: []error{warnStorage}, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
expectedWarnings: [3]Warnings{
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
q := &mergeGenericQuerier{
queriers: tcase.queriers,
mergeFn: func(l ...Labels) Labels { return l[0] },
}
t.Run("Select", func(t *testing.T) {
res := q.Select(false, nil)
var lbls []labels.Labels
for res.Next() {
lbls = append(lbls, res.At().Labels())
}
testutil.Equals(t, tcase.expectedWarnings[0], res.Warnings())
testutil.Equals(t, tcase.expectedErrs[0], res.Err())
testutil.Assert(t, errors.Is(res.Err(), tcase.expectedErrs[0]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedSelectsSeries, lbls)
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
exp := []bool{true}
if len(q.queriers) == 1 {
exp[0] = false
}
testutil.Equals(t, exp, m.sortedSeriesRequested)
}
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames()
testutil.Equals(t, tcase.expectedWarnings[1], w)
testutil.Assert(t, errors.Is(err, tcase.expectedErrs[1]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
testutil.Equals(t, 1, m.labelNamesCalls)
}
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues("test")
testutil.Equals(t, tcase.expectedWarnings[2], w)
testutil.Assert(t, errors.Is(err, tcase.expectedErrs[2]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
testutil.Equals(t, []string{"test"}, m.labelNamesRequested)
}
})
})
}
}

View file

@ -20,13 +20,14 @@ import "github.com/prometheus/prometheus/pkg/labels"
type genericQuerier interface {
baseQuerier
Select(bool, *SelectHints, ...*labels.Matcher) (genericSeriesSet, Warnings, error)
Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet
}
type genericSeriesSet interface {
Next() bool
At() Labels
Err() error
Warnings() Warnings
}
type genericSeriesMergeFunc func(...Labels) Labels
@ -55,13 +56,11 @@ type genericQuerierAdapter struct {
cq ChunkQuerier
}
func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if q.q != nil {
s, w, err := q.q.Select(sortSeries, hints, matchers...)
return &genericSeriesSetAdapter{s}, w, err
return &genericSeriesSetAdapter{q.q.Select(sortSeries, hints, matchers...)}
}
s, w, err := q.cq.Select(sortSeries, hints, matchers...)
return &genericChunkSeriesSetAdapter{s}, w, err
return &genericChunkSeriesSetAdapter{q.cq.Select(sortSeries, hints, matchers...)}
}
func newGenericQuerierFrom(q Querier) genericQuerier {
@ -84,9 +83,8 @@ func (a *seriesSetAdapter) At() Series {
return a.genericSeriesSet.At().(Series)
}
func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &seriesSetAdapter{s}, w, err
func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
return &seriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)}
}
type chunkQuerierAdapter struct {
@ -101,9 +99,8 @@ func (a *chunkSeriesSetAdapter) At() ChunkSeries {
return a.genericSeriesSet.At().(ChunkSeries)
}
func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &chunkSeriesSetAdapter{s}, w, err
func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet {
return &chunkSeriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)}
}
type seriesMergerAdapter struct {
@ -129,3 +126,13 @@ func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
}
return a.VerticalChunkSeriesMergerFunc(buf...)
}
type noopGenericSeriesSet struct{}
func (noopGenericSeriesSet) Next() bool { return false }
func (noopGenericSeriesSet) At() Labels { return nil }
func (noopGenericSeriesSet) Err() error { return nil }
func (noopGenericSeriesSet) Warnings() Warnings { return nil }

View file

@ -65,7 +65,7 @@ type Querier interface {
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error)
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
}
// A ChunkQueryable handles queries against a storage.
@ -82,7 +82,7 @@ type ChunkQuerier interface {
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error)
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
}
type baseQuerier interface {
@ -153,7 +153,12 @@ type Appender interface {
type SeriesSet interface {
Next() bool
At() Series
// The error that iteration as failed with.
// When an error occurs, set cannot continue to iterate.
Err() error
// A collection of warnings for the whole set.
// Warnings could be return even iteration has not failed with error.
Warnings() Warnings
}
var emptySeriesSet = errSeriesSet{}
@ -164,12 +169,19 @@ func EmptySeriesSet() SeriesSet {
}
type errSeriesSet struct {
ws Warnings
err error
}
func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }
func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }
func (s errSeriesSet) Warnings() Warnings { return s.ws }
// ErrSeriesSet returns a series set that wraps an error.
func ErrSeriesSet(err error) SeriesSet {
return errSeriesSet{err: err}
}
// Series exposes a single time series and allows iterating over samples.
type Series interface {
@ -181,7 +193,12 @@ type Series interface {
type ChunkSeriesSet interface {
Next() bool
At() ChunkSeries
// The error that iteration has failed with.
// When an error occurs, set cannot continue to iterate.
Err() error
// A collection of warnings for the whole set.
// Warnings could be return even iteration has not failed with error.
Warnings() Warnings
}
// ChunkSeries exposes a single time series and allows iterating over chunks.

67
storage/lazy.go Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
type lazySeriesSet struct {
create func() (s genericSeriesSet, ok bool)
set genericSeriesSet
}
func (c *lazySeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}
var ok bool
c.set, ok = c.create()
return ok
}
func (c *lazySeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}
func (c *lazySeriesSet) At() Labels {
if c.set != nil {
return c.set.At()
}
return nil
}
func (c *lazySeriesSet) Warnings() Warnings {
if c.set != nil {
return c.set.Warnings()
}
return nil
}
type warningsOnlySeriesSet Warnings
func (warningsOnlySeriesSet) Next() bool { return false }
func (warningsOnlySeriesSet) Err() error { return nil }
func (warningsOnlySeriesSet) At() Labels { return nil }
func (c warningsOnlySeriesSet) Warnings() Warnings { return Warnings(c) }
type errorOnlySeriesSet struct {
err error
}
func (errorOnlySeriesSet) Next() bool { return false }
func (errorOnlySeriesSet) At() Labels { return nil }
func (s errorOnlySeriesSet) Err() error { return s.err }
func (errorOnlySeriesSet) Warnings() Warnings { return nil }

View file

@ -24,8 +24,8 @@ func NoopQuerier() Querier {
return noopQuerier{}
}
func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Warnings, error) {
return NoopSeriesSet(), nil, nil
func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) SeriesSet {
return NoopSeriesSet()
}
func (noopQuerier) LabelValues(string) ([]string, Warnings, error) {
@ -47,8 +47,8 @@ func NoopChunkedQuerier() ChunkQuerier {
return noopChunkQuerier{}
}
func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
return NoopChunkedSeriesSet(), nil, nil
func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) ChunkSeriesSet {
return NoopChunkedSeriesSet()
}
func (noopChunkQuerier) LabelValues(string) ([]string, Warnings, error) {
@ -76,6 +76,8 @@ func (noopSeriesSet) At() Series { return nil }
func (noopSeriesSet) Err() error { return nil }
func (noopSeriesSet) Warnings() Warnings { return nil }
type noopChunkedSeriesSet struct{}
// NoopChunkedSeriesSet is a ChunkSeriesSet that does nothing.
@ -88,3 +90,5 @@ func (noopChunkedSeriesSet) Next() bool { return false }
func (noopChunkedSeriesSet) At() ChunkSeries { return nil }
func (noopChunkedSeriesSet) Err() error { return nil }
func (noopChunkedSeriesSet) Warnings() Warnings { return nil }

View file

@ -107,7 +107,7 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi
}
// ToQueryResult builds a QueryResult proto.
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) {
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) {
numSamples := 0
resp := &prompb.QueryResult{}
for ss.Next() {
@ -118,7 +118,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
for iter.Next() {
numSamples++
if sampleLimit > 0 && numSamples > sampleLimit {
return nil, HTTPError{
return nil, ss.Warnings(), HTTPError{
msg: fmt.Sprintf("exceeded sample limit (%d)", sampleLimit),
status: http.StatusBadRequest,
}
@ -130,7 +130,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
})
}
if err := iter.Err(); err != nil {
return nil, err
return nil, ss.Warnings(), err
}
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
@ -139,9 +139,9 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
})
}
if err := ss.Err(); err != nil {
return nil, err
return nil, ss.Warnings(), err
}
return resp, nil
return resp, ss.Warnings(), nil
}
// FromQueryResult unpacks and sorts a QueryResult proto.
@ -195,7 +195,7 @@ func StreamChunkedReadResponses(
ss storage.SeriesSet,
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
) error {
) (storage.Warnings, error) {
var (
chks []prompb.Chunk
lbls []prompb.Label
@ -218,7 +218,7 @@ func StreamChunkedReadResponses(
// TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/prometheus/pull/5882
chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize)
if err != nil {
return err
return ss.Warnings(), err
}
if len(chks) == 0 {
@ -234,25 +234,25 @@ func StreamChunkedReadResponses(
QueryIndex: queryIndex,
})
if err != nil {
return errors.Wrap(err, "marshal ChunkedReadResponse")
return ss.Warnings(), errors.Wrap(err, "marshal ChunkedReadResponse")
}
if _, err := stream.Write(b); err != nil {
return errors.Wrap(err, "write to stream")
return ss.Warnings(), errors.Wrap(err, "write to stream")
}
chks = chks[:0]
}
if err := iter.Err(); err != nil {
return err
return ss.Warnings(), err
}
}
if err := ss.Err(); err != nil {
return err
return ss.Warnings(), err
}
return nil
return ss.Warnings(), nil
}
// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking).
@ -365,6 +365,8 @@ func (e errSeriesSet) Err() error {
return e.err
}
func (e errSeriesSet) Warnings() storage.Warnings { return nil }
// concreteSeriesSet implements storage.SeriesSet.
type concreteSeriesSet struct {
cur int
@ -384,6 +386,8 @@ func (c *concreteSeriesSet) Err() error {
return nil
}
func (c *concreteSeriesSet) Warnings() storage.Warnings { return nil }
// concreteSeries implements storage.Series.
type concreteSeries struct {
labels labels.Labels

View file

@ -71,10 +71,10 @@ type querier struct {
}
// Select implements storage.Querier and uses the given matchers to read series sets from the Client.
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
query, err := ToQuery(q.mint, q.maxt, matchers, hints)
if err != nil {
return nil, nil, err
return storage.ErrSeriesSet(err)
}
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.remoteName, q.client.url.String())
@ -86,10 +86,10 @@ func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers .
res, err := q.client.Read(q.ctx, query)
if err != nil {
return nil, nil, fmt.Errorf("remote_read: %v", err)
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %v", err))
}
return FromQueryResult(sortSeries, res), nil, nil
return FromQueryResult(sortSeries, res)
}
// LabelValues implements storage.Querier and is a noop.
@ -132,13 +132,9 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets.
func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
m, added := q.addExternalLabels(matchers)
s, warnings, err := q.Querier.Select(sortSeries, hints, m...)
if err != nil {
return nil, warnings, err
}
return newSeriesSetFilter(s, added), warnings, nil
return newSeriesSetFilter(q.Querier.Select(sortSeries, hints, m...), added)
}
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
@ -185,7 +181,7 @@ type requiredMatchersQuerier struct {
// Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
ms := q.requiredMatchers
for _, m := range matchers {
for i, r := range ms {
@ -199,7 +195,7 @@ func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHi
}
}
if len(ms) > 0 {
return storage.NoopSeriesSet(), nil, nil
return storage.NoopSeriesSet()
}
return q.Querier.Select(sortSeries, hints, matchers...)
}

View file

@ -117,10 +117,8 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
},
}
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have, _, err := q.Select(false, nil, matchers...)
if err != nil {
t.Error(err)
}
have := q.Select(false, nil, matchers...)
if !reflect.DeepEqual(want, have) {
t.Errorf("expected series set %+v, got %+v", want, have)
}
@ -218,16 +216,12 @@ func TestSeriesSetFilter(t *testing.T) {
},
}
for i, tc := range tests {
for _, tc := range tests {
filtered := newSeriesSetFilter(FromQueryResult(true, tc.in), tc.toRemove)
have, err := ToQueryResult(filtered, 1e6)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(have, tc.expected) {
t.Fatalf("%d. unexpected labels; want %v, got %v", i, tc.expected, have)
}
act, ws, err := ToQueryResult(filtered, 1e6)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, tc.expected, act)
}
}
@ -242,8 +236,8 @@ type mockSeriesSet struct {
storage.SeriesSet
}
func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return mockSeriesSet{}, nil, nil
func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return mockSeriesSet{}
}
func TestPreferLocalStorageFilter(t *testing.T) {
@ -398,10 +392,8 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
requiredMatchers: test.requiredMatchers,
}
have, _, err := q.Select(false, nil, test.matchers...)
if err != nil {
t.Error(err)
}
have := q.Select(false, nil, test.matchers...)
if want := test.seriesSet; want != have {
t.Errorf("%d. expected series set %+v, got %+v", i, want, have)
}

View file

@ -132,6 +132,9 @@ func (s *Storage) StartTime() (int64, error) {
// Querier returns a storage.MergeQuerier combining the remote client queriers
// of each configured remote read endpoint.
// Returned querier will never return error as all queryables are assumed best effort.
// Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke.
// This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design.
func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
s.mtx.Lock()
queryables := s.queryables

112
storage/secondary.go Normal file
View file

@ -0,0 +1,112 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"sync"
"github.com/prometheus/prometheus/pkg/labels"
)
// secondaryQuerier is a wrapper that allows a querier to be treated in a best effort manner.
// This means that an error on any method returned by Querier except Close will be returned as a warning,
// and the result will be empty.
//
// Additionally, Querier ensures that if ANY SeriesSet returned by this querier's Select failed on an initial Next,
// All other SeriesSet will be return no response as well. This ensures consistent partial response strategy, where you
// have either full results or none from each secondary Querier.
// NOTE: This works well only for implementations that only fail during first Next() (e.g fetch from network). If implementation fails
// during further iterations, set will panic. If Select is invoked after first Next of any returned SeriesSet, querier will panic.
//
// Not go-routine safe.
// NOTE: Prometheus treats all remote storages as secondary / best effort.
type secondaryQuerier struct {
genericQuerier
once sync.Once
done bool
asyncSets []genericSeriesSet
}
func newSecondaryQuerierFrom(q Querier) genericQuerier {
return &secondaryQuerier{genericQuerier: newGenericQuerierFrom(q)}
}
func newSecondaryQuerierFromChunk(cq ChunkQuerier) genericQuerier {
return &secondaryQuerier{genericQuerier: newGenericQuerierFromChunk(cq)}
}
func (s *secondaryQuerier) LabelValues(name string) ([]string, Warnings, error) {
vals, w, err := s.genericQuerier.LabelValues(name)
if err != nil {
return nil, append([]error{err}, w...), nil
}
return vals, w, nil
}
func (s *secondaryQuerier) LabelNames() ([]string, Warnings, error) {
names, w, err := s.genericQuerier.LabelNames()
if err != nil {
return nil, append([]error{err}, w...), nil
}
return names, w, nil
}
func (s *secondaryQuerier) createFn(asyncSet genericSeriesSet) func() (genericSeriesSet, bool) {
s.asyncSets = append(s.asyncSets, asyncSet)
curr := len(s.asyncSets) - 1
return func() (genericSeriesSet, bool) {
s.once.Do(func() {
// At first create invocation we iterate over all sets and ensure its Next() returns some value without
// errors. This is to ensure we support consistent partial failures.
for i, set := range s.asyncSets {
if set.Next() {
continue
}
ws := set.Warnings()
// Failed set.
if err := set.Err(); err != nil {
ws = append([]error{err}, ws...)
// Promote the warnings to the current one.
s.asyncSets[curr] = warningsOnlySeriesSet(ws)
// One of the sets failed, ensure rest of the sets returns nothing. (All or nothing logic).
for i := range s.asyncSets {
if curr != i {
s.asyncSets[i] = noopGenericSeriesSet{}
}
}
break
}
// Exhausted set.
s.asyncSets[i] = warningsOnlySeriesSet(ws)
}
s.done = true
})
switch s.asyncSets[curr].(type) {
case warningsOnlySeriesSet, noopGenericSeriesSet:
return s.asyncSets[curr], false
default:
return s.asyncSets[curr], true
}
}
}
func (s *secondaryQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if s.done {
panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done")
}
return &lazySeriesSet{create: s.createFn(s.genericQuerier.Select(sortSeries, hints, matchers...))}
}

View file

@ -203,9 +203,7 @@ func TestCorruptedChunk(t *testing.T) {
querier, err := NewBlockQuerier(b, 0, 1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }()
set, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
set := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
// Check query err.
testutil.Equals(t, false, set.Next())

View file

@ -617,18 +617,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
err = merr.Err()
}()
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
if err != nil {
return err
}
if len(ws) > 0 {
var merr tsdb_errors.MultiError
for _, w := range ws {
merr.Add(w)
}
return merr.Err()
}
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
for ss.Next() {
series := ss.At()
@ -643,6 +632,14 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
}
}
if ws := ss.Warnings(); len(ws) > 0 {
var merr tsdb_errors.MultiError
for _, w := range ws {
merr.Add(w)
}
return merr.Err()
}
if ss.Err() != nil {
return ss.Err()
}

View file

@ -67,15 +67,12 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func()
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample {
ss, ws, err := q.Select(false, nil, matchers...)
ss := q.Select(false, nil, matchers...)
defer func() {
testutil.Ok(t, q.Close())
}()
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
series := ss.At()
@ -95,6 +92,7 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
result[name] = samples
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, 0, len(ss.Warnings()))
return result
}
@ -315,9 +313,7 @@ Outer:
q, err := db.Querier(context.TODO(), 0, numSamples)
testutil.Ok(t, err)
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
@ -333,6 +329,7 @@ Outer:
testutil.Equals(t, eok, rok)
if !eok {
testutil.Equals(t, 0, len(res.Warnings()))
continue Outer
}
sexp := expss.At()
@ -491,10 +488,7 @@ func TestDB_Snapshot(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }()
// sum values
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
@ -505,6 +499,7 @@ func TestDB_Snapshot(t *testing.T) {
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 0, len(seriesSet.Warnings()))
testutil.Equals(t, 1000.0, sum)
}
@ -546,10 +541,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }()
// Sum values.
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
@ -560,6 +552,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 0, len(seriesSet.Warnings()))
// Since we snapshotted with MaxTime - 10, so expect 10 less samples.
testutil.Equals(t, 1000.0-10, sum)
@ -618,9 +611,7 @@ Outer:
testutil.Ok(t, err)
defer func() { testutil.Ok(t, q.Close()) }()
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
@ -641,6 +632,7 @@ Outer:
testutil.Equals(t, eok, rok)
if !eok {
testutil.Equals(t, 0, len(res.Warnings()))
continue Outer
}
sexp := expss.At()
@ -792,10 +784,7 @@ func TestDB_e2e(t *testing.T) {
q, err := db.Querier(context.TODO(), mint, maxt)
testutil.Ok(t, err)
ss, ws, err := q.Select(false, nil, qry.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
ss := q.Select(false, nil, qry.ms...)
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
@ -810,6 +799,7 @@ func TestDB_e2e(t *testing.T) {
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, 0, len(ss.Warnings()))
testutil.Equals(t, expected, result)
q.Close()
@ -968,9 +958,7 @@ func TestTombstoneClean(t *testing.T) {
testutil.Ok(t, err)
defer q.Close()
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
@ -1004,6 +992,7 @@ func TestTombstoneClean(t *testing.T) {
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
testutil.Equals(t, 0, len(res.Warnings()))
for _, b := range db.Blocks() {
testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
@ -1306,19 +1295,17 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
defer func() { testutil.Ok(t, q.Close()) }()
for _, c := range cases {
ss, ws, err := q.Select(false, nil, c.selector...)
ss := q.Select(false, nil, c.selector...)
lres, _, ws, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
lres, _, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, c.series, lres)
}
}
// expandSeriesSet returns the raw labels in the order they are retrieved from
// the series set and the samples keyed by Labels().String().
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, error) {
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) {
resultLabels := []labels.Labels{}
resultSamples := map[string][]sample{}
for ss.Next() {
@ -1332,7 +1319,7 @@ func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample
resultLabels = append(resultLabels, series.Labels())
resultSamples[series.Labels().String()] = samples
}
return resultLabels, resultSamples, ss.Err()
return resultLabels, resultSamples, ss.Warnings(), ss.Err()
}
func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
@ -2503,9 +2490,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }()
// Sum the values.
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
sum := 0.0
for seriesSet.Next() {
@ -2517,6 +2502,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Ok(t, series.Err())
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 0, len(seriesSet.Warnings()))
testutil.Equals(t, 1000.0, sum)
}
@ -2568,11 +2554,11 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
values := map[float64]struct{}{}
for _, series := range seriesSet {
values[series[len(series)-1].v] = struct{}{}
@ -2608,16 +2594,16 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
defer querierAfterAddButBeforeCommit.Close()
// None of the queriers should return anything after the Add but before the commit.
ss, _, err := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
ss := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, map[string][]sample{}, seriesSet)
ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, map[string][]sample{}, seriesSet)
// This commit is after the queriers are created, so should not be returned.
@ -2625,17 +2611,17 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
testutil.Ok(t, err)
// Nothing returned for querier created before the Add.
ss, _, err = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
ss = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, map[string][]sample{}, seriesSet)
// Series exists but has no samples for querier created after Add.
ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {}}, seriesSet)
querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000)
@ -2643,10 +2629,10 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
defer querierAfterCommit.Close()
// Samples are returned for querier created after Commit.
ss, _, err = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
ss = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}, seriesSet)
}

View file

@ -562,9 +562,7 @@ func TestHeadDeleteSimple(t *testing.T) {
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, q.Close())
expSeriesSet := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
@ -583,6 +581,8 @@ func TestHeadDeleteSimple(t *testing.T) {
if !eok {
testutil.Ok(t, h.Close())
testutil.Ok(t, actSeriesSet.Err())
testutil.Equals(t, 0, len(actSeriesSet.Warnings()))
continue Outer
}
expSeries := expSeriesSet.At()
@ -623,13 +623,15 @@ func TestDeleteUntilCurMax(t *testing.T) {
// Test the series returns no samples. The series is cleared only after compaction.
q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Assert(t, res.Next(), "series is not present")
s := res.At()
it := s.Iterator()
testutil.Assert(t, !it.Next(), "expected no samples")
for res.Next() {
}
testutil.Ok(t, res.Err())
testutil.Equals(t, 0, len(res.Warnings()))
// Add again and test for presence.
app = hb.Appender()
@ -638,15 +640,17 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, app.Commit())
q, err = NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Assert(t, res.Next(), "series don't exist")
exps := res.At()
it = exps.Iterator()
resSamples, err := expandSeriesIterator(it)
testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples)
for res.Next() {
}
testutil.Ok(t, res.Err())
testutil.Equals(t, 0, len(res.Warnings()))
}
func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
@ -807,9 +811,7 @@ func TestDelete_e2e(t *testing.T) {
q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(true, nil, del.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
ss := q.Select(true, nil, del.ms...)
// Build the mockSeriesSet.
matchedSeries := make([]storage.Series, 0, len(matched))
for _, m := range matched {
@ -850,6 +852,8 @@ func TestDelete_e2e(t *testing.T) {
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, 0, len(ss.Warnings()))
}
}
}
@ -1118,11 +1122,12 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Equals(t, true, ss.Next())
for ss.Next() {
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, 0, len(ss.Warnings()))
}
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
@ -1148,11 +1153,9 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Equals(t, false, ss.Next())
testutil.Equals(t, 0, len(ss.Warnings()))
// Truncate again, this time the series should be deleted
testutil.Ok(t, h.Truncate(2050))
@ -1434,11 +1437,11 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
_, seriesSet, ws, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
for _, series := range seriesSet {
return int(series[len(series)-1].v)
}
@ -1790,8 +1793,9 @@ func testHeadSeriesChunkRace(t *testing.T) {
h.gc()
wg.Done()
}()
ss, _, err := q.Select(false, nil, matcher)
testutil.Ok(t, err)
ss := q.Select(false, nil, matcher)
for ss.Next() {
}
testutil.Ok(t, ss.Err())
wg.Wait()
}

View file

@ -85,9 +85,9 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni
return mergeStrings(s1, s2), ws, nil
}
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if len(q.blocks) == 0 {
return storage.EmptySeriesSet(), nil, nil
return storage.EmptySeriesSet()
}
if len(q.blocks) == 1 {
// Sorting Head series is slow, and unneeded when only the
@ -96,18 +96,12 @@ func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*lab
}
ss := make([]storage.SeriesSet, len(q.blocks))
var ws storage.Warnings
for i, b := range q.blocks {
// We have to sort if blocks > 1 as MergedSeriesSet requires it.
s, w, err := b.Select(true, hints, ms...)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
ss[i] = s
ss[i] = b.Select(true, hints, ms...)
}
return NewMergedSeriesSet(ss), ws, nil
return NewMergedSeriesSet(ss)
}
func (q *querier) Close() error {
@ -125,31 +119,23 @@ type verticalQuerier struct {
querier
}
func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
return q.sel(sortSeries, hints, q.blocks, ms)
}
func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) storage.SeriesSet {
if len(qs) == 0 {
return storage.EmptySeriesSet(), nil, nil
return storage.EmptySeriesSet()
}
if len(qs) == 1 {
return qs[0].Select(sortSeries, hints, ms...)
}
l := len(qs) / 2
var ws storage.Warnings
a, w, err := q.sel(sortSeries, hints, qs[:l], ms)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
b, w, err := q.sel(sortSeries, hints, qs[l:], ms)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
return newMergedVerticalSeriesSet(a, b), ws, nil
return newMergedVerticalSeriesSet(
q.sel(sortSeries, hints, qs[:l], ms),
q.sel(sortSeries, hints, qs[l:], ms),
)
}
// NewBlockQuerier returns a querier against the reader.
@ -189,7 +175,7 @@ type blockQuerier struct {
mint, maxt int64
}
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
var base storage.DeprecatedChunkSeriesSet
var err error
@ -199,7 +185,7 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
base, err = LookupChunkSeries(q.index, q.tombstones, ms...)
}
if err != nil {
return nil, nil, err
return storage.ErrSeriesSet(err)
}
mint := q.mint
@ -218,7 +204,7 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
mint: mint,
maxt: maxt,
}, nil, nil
}
}
func (q *blockQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
@ -501,6 +487,14 @@ func (s *mergedSeriesSet) Err() error {
return s.err
}
func (s *mergedSeriesSet) Warnings() storage.Warnings {
var ws storage.Warnings
for _, ss := range s.all {
ws = append(ws, ss.Warnings()...)
}
return ws
}
// nextAll is to call Next() for all SeriesSet.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
@ -509,7 +503,10 @@ func (s *mergedSeriesSet) nextAll() {
for _, ss := range s.all {
if ss.Next() {
s.buf = append(s.buf, ss)
} else if ss.Err() != nil {
continue
}
if ss.Err() != nil {
s.done = true
s.err = ss.Err()
break
@ -623,6 +620,13 @@ func (s *mergedVerticalSeriesSet) Err() error {
return s.b.Err()
}
func (s *mergedVerticalSeriesSet) Warnings() storage.Warnings {
var ws storage.Warnings
ws = append(ws, s.a.Warnings()...)
ws = append(ws, s.b.Warnings()...)
return ws
}
func (s *mergedVerticalSeriesSet) compare() int {
if s.adone {
return 1
@ -848,8 +852,9 @@ func (s *blockSeriesSet) Next() bool {
return false
}
func (s *blockSeriesSet) At() storage.Series { return s.cur }
func (s *blockSeriesSet) Err() error { return s.err }
func (s *blockSeriesSet) At() storage.Series { return s.cur }
func (s *blockSeriesSet) Err() error { return s.err }
func (s *blockSeriesSet) Warnings() storage.Warnings { return nil }
// chunkSeries is a series that is backed by a sequence of chunks holding
// time series data.

View file

@ -155,8 +155,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ss, _, err := q.Select(sorted, nil, matcher)
testutil.Ok(b, err)
ss := q.Select(sorted, nil, matcher)
for ss.Next() {
}
testutil.Ok(b, ss.Err())

View file

@ -39,12 +39,14 @@ import (
type mockSeriesSet struct {
next func() bool
series func() storage.Series
ws func() storage.Warnings
err func() error
}
func (m *mockSeriesSet) Next() bool { return m.next() }
func (m *mockSeriesSet) At() storage.Series { return m.series() }
func (m *mockSeriesSet) Err() error { return m.err() }
func (m *mockSeriesSet) Next() bool { return m.next() }
func (m *mockSeriesSet) At() storage.Series { return m.series() }
func (m *mockSeriesSet) Err() error { return m.err() }
func (m *mockSeriesSet) Warnings() storage.Warnings { return m.ws() }
func newMockSeriesSet(list []storage.Series) *mockSeriesSet {
i := -1
@ -57,11 +59,11 @@ func newMockSeriesSet(list []storage.Series) *mockSeriesSet {
return list[i]
},
err: func() error { return nil },
ws: func() storage.Warnings { return nil },
}
}
func TestMergedSeriesSet(t *testing.T) {
cases := []struct {
// The input sets in order (samples in series in b are strictly
// after those in a).
@ -373,15 +375,14 @@ Outer:
maxt: c.maxt,
}
res, ws, err := querier.Select(false, nil, c.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := querier.Select(false, nil, c.ms...)
for {
eok, rok := c.exp.Next(), res.Next()
testutil.Equals(t, eok, rok)
if !eok {
testutil.Equals(t, 0, len(res.Warnings()))
continue Outer
}
sexp := c.exp.At()
@ -536,15 +537,14 @@ Outer:
maxt: c.maxt,
}
res, ws, err := querier.Select(false, nil, c.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
res := querier.Select(false, nil, c.ms...)
for {
eok, rok := c.exp.Next(), res.Next()
testutil.Equals(t, eok, rok)
if !eok {
testutil.Equals(t, 0, len(res.Warnings()))
continue Outer
}
sexp := c.exp.At()
@ -1654,7 +1654,7 @@ func BenchmarkQuerySeek(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
ss, ws, err := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
for ss.Next() {
it := ss.At().Iterator()
for t := mint; t <= maxt; t++ {
@ -1664,7 +1664,7 @@ func BenchmarkQuerySeek(b *testing.B) {
}
testutil.Ok(b, ss.Err())
testutil.Ok(b, err)
testutil.Equals(b, 0, len(ws))
testutil.Equals(b, 0, len(ss.Warnings()))
})
}
}
@ -1792,9 +1792,11 @@ func BenchmarkSetMatcher(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, ws, err := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
testutil.Ok(b, err)
testutil.Equals(b, 0, len(ws))
ss := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
for ss.Next() {
}
testutil.Ok(b, ss.Err())
testutil.Equals(b, 0, len(ss.Warnings()))
}
})
}
@ -2252,9 +2254,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ss, ws, err := q.Select(false, nil, selectors...)
testutil.Ok(b, err)
testutil.Equals(b, 0, len(ws))
ss := q.Select(false, nil, selectors...)
var actualExpansions int
for ss.Next() {
s := ss.At()
@ -2264,6 +2264,8 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
}
actualExpansions++
}
testutil.Ok(b, ss.Err())
testutil.Equals(b, 0, len(ss.Warnings()))
testutil.Equals(b, expExpansions, actualExpansions)
testutil.Ok(b, ss.Err())
}

View file

@ -596,13 +596,8 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
}
var sets []storage.SeriesSet
var warnings storage.Warnings
for _, mset := range matcherSets {
s, wrn, err := q.Select(false, nil, mset...)
warnings = append(warnings, wrn...)
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
}
s := q.Select(false, nil, mset...)
sets = append(sets, s)
}
@ -611,6 +606,8 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
warnings := set.Warnings()
if set.Err() != nil {
return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, closer}
}
@ -1226,12 +1223,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
return
}
for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) {
// The streaming API has to provide the series sorted.
set, _, err := querier.Select(true, hints, filteredMatchers...)
if err != nil {
return err
}
set := querier.Select(true, hints, filteredMatchers...)
return remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
@ -1241,6 +1235,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
api.remoteReadMaxBytesInFrame,
)
})
for _, w := range ws {
level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error())
}
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
@ -1259,22 +1256,26 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
Results: make([]*prompb.QueryResult, len(req.Queries)),
}
for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
set, _, err := querier.Select(false, hints, filteredMatchers...)
if err != nil {
return err
}
ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) {
set := querier.Select(false, hints, filteredMatchers...)
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
var (
ws storage.Warnings
err error
)
resp.Results[i], ws, err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
if err != nil {
return err
return ws, err
}
for _, ts := range resp.Results[i].Timeseries {
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
}
return nil
return ws, nil
})
for _, w := range ws {
level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error())
}
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
@ -1318,15 +1319,15 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
return filteredMatchers, nil
}
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error) error {
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error)) (storage.Warnings, error) {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
return err
return nil, err
}
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return err
return nil, err
}
defer func() {
if err := querier.Close(); err != nil {

View file

@ -517,12 +517,8 @@ func setupRemote(s storage.Storage) *httptest.Server {
}
defer querier.Close()
set, _, err := querier.Select(false, hints, matchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp.Results[i], err = remote.ToQueryResult(set, 1e6)
set := querier.Select(false, hints, matchers...)
resp.Results[i], _, err = remote.ToQueryResult(set, 1e6)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View file

@ -95,16 +95,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s, wrns, err := q.Select(false, hints, mset...)
if wrns != nil {
level.Debug(h.logger).Log("msg", "Federation select returned warnings", "warnings", wrns)
federationWarnings.Add(float64(len(wrns)))
}
if err != nil {
federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s := q.Select(false, hints, mset...)
sets = append(sets, s)
}
@ -142,6 +133,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
Point: promql.Point{T: t, V: v},
})
}
if ws := set.Warnings(); len(ws) > 0 {
level.Debug(h.logger).Log("msg", "Federation select returned warnings", "warnings", ws)
federationWarnings.Add(float64(len(ws)))
}
if set.Err() != nil {
federationErrors.Inc()
http.Error(w, set.Err().Error(), http.StatusInternalServerError)