Return warnings on a remote read fail (#4832)

Signed-off-by: Mark Knapp <mknapp@hudson-trading.com>
This commit is contained in:
mknapphrt 2018-11-30 09:27:12 -05:00 committed by Brian Brazil
parent 77ee41084f
commit f0e9196dca
18 changed files with 349 additions and 209 deletions

View file

@ -24,6 +24,10 @@ and one of the following HTTP response codes:
Other non-`2xx` codes may be returned for errors occurring before the API
endpoint is reached.
An array of warnings may be returned if there are errors that do
not inhibit the request execution. All of the data that was successfully
collected will be returned in the data field.
The JSON response envelope format is as follows:
```
@ -34,7 +38,11 @@ The JSON response envelope format is as follows:
// Only set if status is "error". The data field may still hold
// additional data.
"errorType": "<string>",
"error": "<string>"
"error": "<string>",
// Only if there were warnings while executing the request.
// There will still be data in the data field.
"warnings": ["<string>"]
}
```

View file

@ -111,8 +111,9 @@ type MatrixSelector struct {
Offset time.Duration
LabelMatchers []*labels.Matcher
// The series are populated at query preparation time.
series []storage.Series
// The unexpanded seriesSet populated at query preparation time.
unexpandedSeriesSet storage.SeriesSet
series []storage.Series
}
// NumberLiteral represents a number.
@ -144,8 +145,9 @@ type VectorSelector struct {
Offset time.Duration
LabelMatchers []*labels.Matcher
// The series are populated at query preparation time.
series []storage.Series
// The unexpanded seriesSet populated at query preparation time.
unexpandedSeriesSet storage.SeriesSet
series []storage.Series
}
func (e *AggregateExpr) Type() ValueType { return ValueTypeVector }

View file

@ -154,8 +154,8 @@ func (q *query) Exec(ctx context.Context) *Result {
span.SetTag(queryTag, q.stmt.String())
}
res, err := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res}
res, err, warnings := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res, Warnings: warnings}
}
// contextDone returns an error if the context was canceled or timed out.
@ -332,7 +332,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error, storage.Warnings) {
ng.metrics.currentQueries.Inc()
defer ng.metrics.currentQueries.Dec()
@ -345,7 +345,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
if err := ng.gate.Start(ctx); err != nil {
return nil, contextErr(err, "query queue")
return nil, contextErr(err, "query queue"), nil
}
defer ng.gate.Done()
@ -361,14 +361,14 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
// The base context might already be canceled on the first iteration (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil {
return nil, err
return nil, err, nil
}
switch s := q.Statement().(type) {
case *EvalStmt:
return ng.execEvalStmt(ctx, q, s)
case testStmt:
return nil, s(ctx)
return nil, s(ctx), nil
}
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
@ -383,9 +383,9 @@ func durationMilliseconds(d time.Duration) int64 {
}
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) {
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error, storage.Warnings) {
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
querier, err := ng.populateSeries(ctxPrepare, query.queryable, s)
querier, err, warnings := ng.populateSeries(ctxPrepare, query.queryable, s)
prepareSpanTimer.Finish()
// XXX(fabxc): the querier returned by populateSeries might be instantiated
@ -396,7 +396,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
}
if err != nil {
return nil, err
return nil, err, warnings
}
evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
@ -413,7 +413,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
return nil, err
return nil, err, warnings
}
evalSpanTimer.Finish()
@ -432,11 +432,11 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
// timestamp as that is when we ran the evaluation.
vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}}
}
return vector, nil
return vector, nil, warnings
case ValueTypeScalar:
return Scalar{V: mat[0].Points[0].V, T: start}, nil
return Scalar{V: mat[0].Points[0].V, T: start}, nil, warnings
case ValueTypeMatrix:
return mat, nil
return mat, nil, warnings
default:
panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
}
@ -454,7 +454,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
return nil, err
return nil, err, warnings
}
evalSpanTimer.Finish()
@ -465,7 +465,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
query.matrix = mat
if err := contextDone(ctx, "expression evaluation"); err != nil {
return nil, err
return nil, err, warnings
}
// TODO(fabxc): order ensured by storage?
@ -474,10 +474,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
sort.Sort(mat)
sortSpanTimer.Finish()
return mat, nil
return mat, nil, warnings
}
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) {
var maxOffset time.Duration
Inspect(s.Expr, func(node Node, _ []Node) error {
switch n := node.(type) {
@ -503,11 +503,14 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End))
if err != nil {
return nil, err
return nil, err, nil
}
var warnings storage.Warnings
Inspect(s.Expr, func(node Node, path []Node) error {
var set storage.SeriesSet
var wrn storage.Warnings
params := &storage.SelectParams{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
@ -524,17 +527,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
params.End = params.End - offsetMilliseconds
}
set, err = querier.Select(params, n.LabelMatchers...)
set, err, wrn = querier.Select(params, n.LabelMatchers...)
warnings = append(warnings, wrn...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return err
}
n.series, err = expandSeriesSet(ctx, set)
if err != nil {
// TODO(fabxc): use multi-error.
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return err
}
n.unexpandedSeriesSet = set
case *MatrixSelector:
params.Func = extractFuncFromPath(path)
@ -547,20 +546,17 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
params.End = params.End - offsetMilliseconds
}
set, err = querier.Select(params, n.LabelMatchers...)
set, err, wrn = querier.Select(params, n.LabelMatchers...)
warnings = append(warnings, wrn...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return err
}
n.series, err = expandSeriesSet(ctx, set)
if err != nil {
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return err
}
n.unexpandedSeriesSet = set
}
return nil
})
return querier, err
return querier, err, warnings
}
// extractFuncFromPath walks up the path and searches for the first instance of
@ -582,6 +578,30 @@ func extractFuncFromPath(p []Node) string {
return extractFuncFromPath(p[:len(p)-1])
}
func checkForSeriesSetExpansion(expr Expr, ctx context.Context) error {
switch e := expr.(type) {
case *MatrixSelector:
if e.series == nil {
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
if err != nil {
panic(err)
} else {
e.series = series
}
}
case *VectorSelector:
if e.series == nil {
series, err := expandSeriesSet(ctx, e.unexpandedSeriesSet)
if err != nil {
panic(err)
} else {
e.series = series
}
}
}
return nil
}
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) {
for it.Next() {
select {
@ -887,6 +907,9 @@ func (ev *evaluator) eval(expr Expr) Value {
}
sel := e.Args[matrixArgIndex].(*MatrixSelector)
if err := checkForSeriesSetExpansion(sel, ev.ctx); err != nil {
ev.error(err)
}
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
offset := durationMilliseconds(sel.Offset)
selRange := durationMilliseconds(sel.Range)
@ -1018,6 +1041,9 @@ func (ev *evaluator) eval(expr Expr) Value {
})
case *VectorSelector:
if err := checkForSeriesSetExpansion(e, ev.ctx); err != nil {
ev.error(err)
}
mat := make(Matrix, 0, len(e.series))
it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
for i, s := range e.series {
@ -1058,6 +1084,10 @@ func (ev *evaluator) eval(expr Expr) Value {
// vectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
ev.error(err)
}
var (
vec = make(Vector, 0, len(node.series))
)
@ -1127,17 +1157,20 @@ func putPointSlice(p []Point) {
// matrixSelector evaluates a *MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
ev.error(err)
}
var (
offset = durationMilliseconds(node.Offset)
maxt = ev.startTimestamp - offset
mint = maxt - durationMilliseconds(node.Range)
matrix = make(Matrix, 0, len(node.series))
err error
)
it := storage.NewBuffer(durationMilliseconds(node.Range))
for i, s := range node.series {
if err = contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
it.Reset(s.Iterator())

View file

@ -169,8 +169,8 @@ type errQuerier struct {
err error
}
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) {
return errSeriesSet{err: q.err}, q.err
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
return errSeriesSet{err: q.err}, q.err, nil
}
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
func (*errQuerier) LabelNames() ([]string, error) { return nil, nil }
@ -425,7 +425,8 @@ load 10s
MaxSamples: 1,
Result: Result{
nil,
Scalar{V: 1, T: 1000}},
Scalar{V: 1, T: 1000},
nil},
Start: time.Unix(1, 0),
},
{
@ -434,6 +435,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(1, 0),
},
@ -443,6 +445,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(1, 0),
},
@ -455,6 +458,7 @@ load 10s
Sample{Point: Point{V: 1, T: 1000},
Metric: labels.FromStrings("__name__", "metric")},
},
nil,
},
Start: time.Unix(1, 0),
},
@ -467,6 +471,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}},
Metric: labels.FromStrings("__name__", "metric")},
},
nil,
},
Start: time.Unix(10, 0),
},
@ -476,6 +481,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(10, 0),
},
@ -489,6 +495,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
Metric: labels.FromStrings()},
},
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(2, 0),
@ -500,6 +507,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(2, 0),
@ -514,6 +522,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
Metric: labels.FromStrings("__name__", "metric")},
},
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(2, 0),
@ -525,6 +534,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(2, 0),
@ -539,6 +549,7 @@ load 10s
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
Metric: labels.FromStrings("__name__", "metric")},
},
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(10, 0),
@ -550,6 +561,7 @@ load 10s
Result: Result{
ErrTooManySamples(env),
nil,
nil,
},
Start: time.Unix(0, 0),
End: time.Unix(10, 0),

View file

@ -20,6 +20,7 @@ import (
"strings"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
// Value is a generic interface for values resulting from a query evaluation.
@ -201,8 +202,9 @@ func (m Matrix) ContainsSameLabelset() bool {
// Result holds the resulting value of an execution or an error
// if any occurred.
type Result struct {
Err error
Value Value
Err error
Value Value
Warnings storage.Warnings
}
// Vector returns a Vector if the result value is one. An error is returned if

View file

@ -518,7 +518,7 @@ func (g *Group) RestoreForState(ts time.Time) {
matchers = append(matchers, mt)
}
sset, err := q.Select(nil, matchers...)
sset, err, _ := q.Select(nil, matchers...)
if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)

View file

@ -538,7 +538,7 @@ func TestStaleness(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err)
set, err := querier.Select(nil, matcher)
set, err, _ := querier.Select(nil, matcher)
testutil.Ok(t, err)
samples, err := readSeriesSet(set)

View file

@ -68,23 +68,23 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
queriers := make([]Querier, 0, 1+len(f.secondaries))
// Add primary querier
querier, err := f.primary.Querier(ctx, mint, maxt)
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, querier)
queriers = append(queriers, primaryQuerier)
// Add secondary queriers
for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt)
if err != nil {
NewMergeQuerier(queriers).Close()
NewMergeQuerier(primaryQuerier, queriers).Close()
return nil, err
}
queriers = append(queriers, querier)
}
return NewMergeQuerier(queriers), nil
return NewMergeQuerier(primaryQuerier, queriers), nil
}
func (f *fanout) Appender() (Appender, error) {
@ -190,14 +190,18 @@ func (f *fanoutAppender) Rollback() (err error) {
// mergeQuerier implements Querier.
type mergeQuerier struct {
queriers []Querier
primaryQuerier Querier
queriers []Querier
failedQueriers map[Querier]struct{}
setQuerierMap map[SeriesSet]Querier
}
// NewMergeQuerier returns a new Querier that merges results of input queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
func NewMergeQuerier(queriers []Querier) Querier {
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
filtered := make([]Querier, 0, len(queriers))
for _, querier := range queriers {
if querier != NoopQuerier() {
@ -205,6 +209,9 @@ func NewMergeQuerier(queriers []Querier) Querier {
}
}
setQuerierMap := make(map[SeriesSet]Querier)
failedQueriers := make(map[Querier]struct{})
switch len(filtered) {
case 0:
return NoopQuerier()
@ -212,22 +219,37 @@ func NewMergeQuerier(queriers []Querier) Querier {
return filtered[0]
default:
return &mergeQuerier{
queriers: filtered,
primaryQuerier: primaryQuerier,
queriers: filtered,
failedQueriers: failedQueriers,
setQuerierMap: setQuerierMap,
}
}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error) {
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, error, Warnings) {
seriesSets := make([]SeriesSet, 0, len(q.queriers))
var warnings Warnings
for _, querier := range q.queriers {
set, err := querier.Select(params, matchers...)
set, err, wrn := querier.Select(params, matchers...)
q.setQuerierMap[set] = querier
if wrn != nil {
warnings = append(warnings, wrn...)
}
if err != nil {
return nil, err
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primary querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
return nil, err, nil
}
}
seriesSets = append(seriesSets, set)
}
return NewMergeSeriesSet(seriesSets), nil
return NewMergeSeriesSet(seriesSets, q), nil, warnings
}
// LabelValues returns all potential values for a label name.
@ -243,6 +265,11 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, error) {
return mergeStringSlices(results), nil
}
func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
return isFailedQuerier
}
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
@ -322,11 +349,13 @@ type mergeSeriesSet struct {
currentSets []SeriesSet
heap seriesSetHeap
sets []SeriesSet
querier *mergeQuerier
}
// NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating.
func NewMergeSeriesSet(sets []SeriesSet) SeriesSet {
func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
if len(sets) == 1 {
return sets[0]
}
@ -335,34 +364,53 @@ func NewMergeSeriesSet(sets []SeriesSet) SeriesSet {
// series under the cursor.
var h seriesSetHeap
for _, set := range sets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
}
}
return &mergeSeriesSet{
heap: h,
sets: sets,
heap: h,
sets: sets,
querier: querier,
}
}
func (c *mergeSeriesSet) Next() bool {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
// Run in a loop because the "next" series sets may not be valid anymore.
// If a remote querier fails, we discard all series sets from that querier.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
c.currentSets = append(c.currentSets, set)
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
if c.querier != nil && c.querier.IsFailedSet(set) {
continue
}
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
}
return true
}

View file

@ -109,7 +109,7 @@ func TestMergeSeriesSet(t *testing.T) {
),
},
} {
merged := NewMergeSeriesSet(tc.input)
merged := NewMergeSeriesSet(tc.input, nil)
for merged.Next() {
require.True(t, tc.expected.Next())
actualSeries := merged.At()
@ -262,7 +262,7 @@ func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples))
}
return NewMergeSeriesSet(seriesSets)
return NewMergeSeriesSet(seriesSets, nil)
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {

View file

@ -52,7 +52,7 @@ type Queryable interface {
// Querier provides reading access to time series data.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error)
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings)
// LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, error)
@ -122,3 +122,5 @@ type SeriesIterator interface {
// Err returns the current error.
Err() error
}
type Warnings []error

View file

@ -26,8 +26,8 @@ func NoopQuerier() Querier {
return noopQuerier{}
}
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error) {
return NoopSeriesSet(), nil
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, error, Warnings) {
return NoopSeriesSet(), nil, nil
}
func (noopQuerier) LabelValues(name string) ([]string, error) {

View file

@ -59,10 +59,10 @@ type querier struct {
// Select implements storage.Querier and uses the given matchers to read series
// sets from the Client.
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
query, err := ToQuery(q.mint, q.maxt, matchers, p)
if err != nil {
return nil, err
return nil, err, nil
}
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name())
@ -71,10 +71,10 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (
res, err := q.client.Read(q.ctx, query)
if err != nil {
return nil, err
return nil, err, nil
}
return FromQueryResult(res), nil
return FromQueryResult(res), nil, nil
}
// LabelValues implements storage.Querier and is a noop.
@ -117,13 +117,13 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets.
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
m, added := q.addExternalLabels(matchers)
s, err := q.Querier.Select(p, m...)
s, err, warnings := q.Querier.Select(p, m...)
if err != nil {
return nil, err
return nil, err, warnings
}
return newSeriesSetFilter(s, added), nil
return newSeriesSetFilter(s, added), nil, warnings
}
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
@ -170,7 +170,7 @@ type requiredMatchersQuerier struct {
// Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
ms := q.requiredMatchers
for _, m := range matchers {
for i, r := range ms {
@ -184,7 +184,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la
}
}
if len(ms) > 0 {
return storage.NoopSeriesSet(), nil
return storage.NoopSeriesSet(), nil, nil
}
return q.Querier.Select(p, matchers...)
}
@ -225,6 +225,15 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.S
type seriesSetFilter struct {
storage.SeriesSet
toFilter model.LabelSet
querier storage.Querier
}
func (ssf *seriesSetFilter) GetQuerier() storage.Querier {
return ssf.querier
}
func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) {
ssf.querier = querier
}
func (ssf seriesSetFilter) At() storage.Series {

View file

@ -42,7 +42,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
externalLabels: model.LabelSet{"region": "europe"},
}
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have, err := q.Select(nil, matchers...)
have, err, _ := q.Select(nil, matchers...)
if err != nil {
t.Error(err)
}
@ -157,8 +157,8 @@ type mockSeriesSet struct {
storage.SeriesSet
}
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) {
return mockSeriesSet{}, nil
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
return mockSeriesSet{}, nil, nil
}
func TestPreferLocalStorageFilter(t *testing.T) {
@ -313,7 +313,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
requiredMatchers: test.requiredMatchers,
}
have, err := q.Select(nil, test.matchers...)
have, err, _ := q.Select(nil, test.matchers...)
if err != nil {
t.Error(err)
}

View file

@ -140,7 +140,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
}
queriers = append(queriers, q)
}
return storage.NewMergeQuerier(queriers), nil
return storage.NewMergeQuerier(nil, queriers), nil
}
// Close the background processing of the storage queues.

View file

@ -230,7 +230,7 @@ type querier struct {
q tsdb.Querier
}
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error, storage.Warnings) {
ms := make([]tsdbLabels.Matcher, 0, len(oms))
for _, om := range oms {
@ -238,9 +238,9 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag
}
set, err := q.q.Select(ms...)
if err != nil {
return nil, err
return nil, err, nil
}
return seriesSet{set: set}, nil
return seriesSet{set: set}, nil, nil
}
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }

View file

@ -119,6 +119,14 @@ type response struct {
Data interface{} `json:"data,omitempty"`
ErrorType errorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
type apiFuncResult struct {
data interface{}
err *apiError
warnings storage.Warnings
finalizer func()
}
// Enables cross-site script calls.
@ -128,7 +136,7 @@ func setCORS(w http.ResponseWriter) {
}
}
type apiFunc func(r *http.Request) (interface{}, *apiError, func())
type apiFunc func(r *http.Request) apiFuncResult
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations.
type TSDBAdmin interface {
@ -204,16 +212,16 @@ func (api *API) Register(r *route.Router) {
wrap := func(f apiFunc) http.HandlerFunc {
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
setCORS(w)
data, err, finalizer := f(r)
if err != nil {
api.respondError(w, err, data)
} else if data != nil {
api.respond(w, data)
result := f(r)
if result.err != nil {
api.respondError(w, result.err, result.data)
} else if result.data != nil {
api.respond(w, result.data, result.warnings)
} else {
w.WriteHeader(http.StatusNoContent)
}
if finalizer != nil {
finalizer()
if result.finalizer != nil {
result.finalizer()
}
})
return api.ready(httputil.CompressionHandler{
@ -258,17 +266,17 @@ type queryData struct {
Stats *stats.QueryStats `json:"stats,omitempty"`
}
func (api *API) options(r *http.Request) (interface{}, *apiError, func()) {
return nil, nil, nil
func (api *API) options(r *http.Request) apiFuncResult {
return apiFuncResult{nil, nil, nil, nil}
}
func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) query(r *http.Request) apiFuncResult {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
} else {
ts = api.now()
@ -279,7 +287,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
var cancel context.CancelFunc
timeout, err := parseDuration(to)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
ctx, cancel = context.WithTimeout(ctx, timeout)
@ -288,12 +296,12 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
res := qry.Exec(ctx)
if res.Err != nil {
return nil, returnAPIError(res.Err), qry.Close
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}
// Optional stats field in response if parameter "stats" is not empty.
@ -302,42 +310,42 @@ func (api *API) query(r *http.Request) (interface{}, *apiError, func()) {
qs = stats.NewQueryStats(qry.Stats())
}
return &queryData{
return apiFuncResult{&queryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
}, nil, qry.Close
}, nil, res.Warnings, qry.Close}
}
func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) queryRange(r *http.Request) apiFuncResult {
start, err := parseTime(r.FormValue("start"))
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
end, err := parseTime(r.FormValue("end"))
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
if end.Before(start) {
err := errors.New("end timestamp must not be before start time")
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
step, err := parseDuration(r.FormValue("step"))
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
if step <= 0 {
err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if end.Sub(start)/step > 11000 {
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
ctx := r.Context()
@ -345,7 +353,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
var cancel context.CancelFunc
timeout, err := parseDuration(to)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
ctx, cancel = context.WithTimeout(ctx, timeout)
@ -354,12 +362,12 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
res := qry.Exec(ctx)
if res.Err != nil {
return nil, returnAPIError(res.Err), qry.Close
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}
// Optional stats field in response if parameter "stats" is not empty.
@ -368,11 +376,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) {
qs = stats.NewQueryStats(qry.Stats())
}
return &queryData{
return apiFuncResult{&queryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
}, nil, qry.Close
}, nil, res.Warnings, qry.Close}
}
func returnAPIError(err error) *apiError {
@ -392,39 +400,39 @@ func returnAPIError(err error) *apiError {
return &apiError{errorExec, err}
}
func (api *API) labelNames(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) labelNames(r *http.Request) apiFuncResult {
q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64)
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
defer q.Close()
names, err := q.LabelNames()
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
return names, nil, nil
return apiFuncResult{names, nil, nil, nil}
}
func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) labelValues(r *http.Request) apiFuncResult {
ctx := r.Context()
name := route.Param(ctx, "name")
if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil, nil}
}
q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
defer q.Close()
vals, err := q.LabelValues(name)
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
return vals, nil, nil
return apiFuncResult{vals, nil, nil, nil}
}
var (
@ -432,12 +440,12 @@ var (
maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
)
func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) series(r *http.Request) apiFuncResult {
if err := r.ParseForm(); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil}
}
if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil}
}
var start time.Time
@ -445,7 +453,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
var err error
start, err = parseTime(t)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
} else {
start = minTime
@ -456,7 +464,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
var err error
end, err = parseTime(t)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
} else {
end = maxTime
@ -466,40 +474,42 @@ func (api *API) series(r *http.Request) (interface{}, *apiError, func()) {
for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
matcherSets = append(matcherSets, matchers)
}
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
}
defer q.Close()
var sets []storage.SeriesSet
var warnings storage.Warnings
for _, mset := range matcherSets {
s, err := q.Select(nil, mset...)
s, err, wrn := q.Select(nil, mset...) //TODO
warnings = append(warnings, wrn...)
if err != nil {
return nil, &apiError{errorExec, err}, nil
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
}
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets)
set := storage.NewMergeSeriesSet(sets, nil)
metrics := []labels.Labels{}
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
if set.Err() != nil {
return nil, &apiError{errorExec, set.Err()}, nil
return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, nil}
}
return metrics, nil, nil
return apiFuncResult{metrics, nil, warnings, nil}
}
func (api *API) dropSeries(r *http.Request) (interface{}, *apiError, func()) {
return nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil
func (api *API) dropSeries(r *http.Request) apiFuncResult {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil, nil}
}
// Target has the information for one target.
@ -528,7 +538,7 @@ type TargetDiscovery struct {
DroppedTargets []*DroppedTarget `json:"droppedTargets"`
}
func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) targets(r *http.Request) apiFuncResult {
flatten := func(targets map[string][]*scrape.Target) []*scrape.Target {
var n int
keys := make([]string, 0, len(targets))
@ -570,7 +580,7 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
DiscoveredLabels: t.DiscoveredLabels().Map(),
})
}
return res, nil, nil
return apiFuncResult{res, nil, nil, nil}
}
func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool {
@ -582,18 +592,18 @@ func matchLabels(lset labels.Labels, matchers []*labels.Matcher) bool {
return true
}
func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) targetMetadata(r *http.Request) apiFuncResult {
limit := -1
if s := r.FormValue("limit"); s != "" {
var err error
if limit, err = strconv.Atoi(s); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}, nil, nil}
}
}
matchers, err := promql.ParseMetricSelector(r.FormValue("match_target"))
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
metric := r.FormValue("metric")
@ -633,9 +643,9 @@ func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError, func())
}
}
if len(res) == 0 {
return nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil
return apiFuncResult{nil, &apiError{errorNotFound, errors.New("specified metadata not found")}, nil, nil}
}
return res, nil, nil
return apiFuncResult{res, nil, nil, nil}
}
type metricMetadata struct {
@ -657,7 +667,7 @@ type AlertmanagerTarget struct {
URL string `json:"url"`
}
func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) alertmanagers(r *http.Request) apiFuncResult {
urls := api.alertmanagerRetriever.Alertmanagers()
droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers()
ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))}
@ -667,7 +677,7 @@ func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func())
for i, url := range droppedURLS {
ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()}
}
return ams, nil, nil
return apiFuncResult{ams, nil, nil, nil}
}
// AlertDiscovery has info for all active alerts.
@ -684,7 +694,7 @@ type Alert struct {
Value float64 `json:"value"`
}
func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) alerts(r *http.Request) apiFuncResult {
alertingRules := api.rulesRetriever.AlertingRules()
alerts := []*Alert{}
@ -697,7 +707,7 @@ func (api *API) alerts(r *http.Request) (interface{}, *apiError, func()) {
res := &AlertDiscovery{Alerts: alerts}
return res, nil, nil
return apiFuncResult{res, nil, nil, nil}
}
func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert {
@ -756,7 +766,7 @@ type recordingRule struct {
Type string `json:"type"`
}
func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) rules(r *http.Request) apiFuncResult {
ruleGroups := api.rulesRetriever.RuleGroups()
res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))}
for i, grp := range ruleGroups {
@ -799,29 +809,29 @@ func (api *API) rules(r *http.Request) (interface{}, *apiError, func()) {
}
default:
err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name())
return nil, &apiError{errorInternal, err}, nil
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}
apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule)
}
res.RuleGroups[i] = apiRuleGroup
}
return res, nil, nil
return apiFuncResult{res, nil, nil, nil}
}
type prometheusConfig struct {
YAML string `json:"yaml"`
}
func (api *API) serveConfig(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) serveConfig(r *http.Request) apiFuncResult {
cfg := &prometheusConfig{
YAML: api.config().String(),
}
return cfg, nil, nil
return apiFuncResult{cfg, nil, nil, nil}
}
func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) {
return api.flagsMap, nil, nil
func (api *API) serveFlags(r *http.Request) apiFuncResult {
return apiFuncResult{api.flagsMap, nil, nil, nil}
}
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
@ -873,7 +883,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
}
}
set, err := querier.Select(selectParams, filteredMatchers...)
set, err, _ := querier.Select(selectParams, filteredMatchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -911,20 +921,20 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
}
}
func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
}
db := api.db()
if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := r.ParseForm(); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("error parsing form values: %v", err)}, nil, nil}
}
if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil, nil}
}
var start time.Time
@ -932,7 +942,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
var err error
start, err = parseTime(t)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
} else {
start = minTime
@ -943,7 +953,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
var err error
end, err = parseTime(t)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
} else {
end = maxTime
@ -952,7 +962,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s)
if err != nil {
return nil, &apiError{errorBadData, err}, nil
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
var selector tsdbLabels.Selector
@ -961,16 +971,16 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) {
}
if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil {
return nil, &apiError{errorInternal, err}, nil
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}
}
return nil, nil, nil
return apiFuncResult{nil, nil, nil, nil}
}
func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) snapshot(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
}
var (
skipHead bool
@ -979,13 +989,13 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) {
if r.FormValue("skip_head") != "" {
skipHead, err = strconv.ParseBool(r.FormValue("skip_head"))
if err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("unable to parse boolean 'skip_head' argument: %v", err)}, nil, nil}
}
}
db := api.db()
if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
var (
@ -996,31 +1006,31 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) {
dir = filepath.Join(snapdir, name)
)
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil, nil}
}
if err := db.Snapshot(dir, !skipHead); err != nil {
return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil, nil}
}
return struct {
return apiFuncResult{struct {
Name string `json:"name"`
}{name}, nil, nil
}{name}, nil, nil, nil}
}
func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError, func()) {
func (api *API) cleanTombstones(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil, nil}
}
db := api.db()
if db == nil {
return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := db.CleanTombstones(); err != nil {
return nil, &apiError{errorInternal, err}, nil
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
}
return nil, nil, nil
return apiFuncResult{nil, nil, nil, nil}
}
func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {
@ -1075,11 +1085,17 @@ func mergeLabels(primary, secondary []*prompb.Label) []*prompb.Label {
return result
}
func (api *API) respond(w http.ResponseWriter, data interface{}) {
func (api *API) respond(w http.ResponseWriter, data interface{}, warnings storage.Warnings) {
statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
warningStrings = append(warningStrings, warning.Error())
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
b, err := json.Marshal(&response{
Status: statusSuccess,
Data: data,
Status: statusMessage,
Data: data,
Warnings: warningStrings,
})
if err != nil {
level.Error(api.logger).Log("msg", "error marshaling json response", "err", err)

View file

@ -349,9 +349,9 @@ func TestLabelNames(t *testing.T) {
ctx := context.Background()
req, err := request(method)
testutil.Ok(t, err)
resp, apiErr, _ := api.labelNames(req.WithContext(ctx))
assertAPIError(t, apiErr, "")
assertAPIResponse(t, resp, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"})
res := api.labelNames(req.WithContext(ctx))
assertAPIError(t, res.err, "")
assertAPIResponse(t, res.data, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"})
}
}
@ -379,7 +379,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
}
defer querier.Close()
set, err := querier.Select(selectParams, matchers...)
set, err, _ := querier.Select(selectParams, matchers...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -857,9 +857,9 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) {
if err != nil {
t.Fatal(err)
}
resp, apiErr, _ := test.endpoint(req.WithContext(ctx))
assertAPIError(t, apiErr, test.errType)
assertAPIResponse(t, resp, test.response)
res := test.endpoint(req.WithContext(ctx))
assertAPIError(t, res.err, test.errType)
assertAPIResponse(t, res.data, test.response)
}
}
}
@ -1202,8 +1202,8 @@ func TestAdminEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("Error when creating test request: %s", err)
}
_, apiErr, _ := endpoint(req)
assertAPIError(t, apiErr, tc.errType)
res := endpoint(req)
assertAPIError(t, res.err, tc.errType)
})
}
}
@ -1211,7 +1211,7 @@ func TestAdminEndpoints(t *testing.T) {
func TestRespondSuccess(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
api := API{}
api.respond(w, "test")
api.respond(w, "test", nil)
}))
defer s.Close()
@ -1502,7 +1502,7 @@ func TestRespond(t *testing.T) {
for _, c := range cases {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
api := API{}
api.respond(w, c.response)
api.respond(w, c.response, nil)
}))
defer s.Close()
@ -1543,6 +1543,6 @@ func BenchmarkRespond(b *testing.B) {
b.ResetTimer()
api := API{}
for n := 0; n < b.N; n++ {
api.respond(&testResponseWriter, response)
api.respond(&testResponseWriter, response, nil)
}
}

View file

@ -37,6 +37,10 @@ var (
Name: "prometheus_web_federation_errors_total",
Help: "Total number of errors that occurred while sending federation responses.",
})
federationWarnings = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_web_federation_warnings_total",
Help: "Total number of warnings that occurred while sending federation responses.",
})
)
func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
@ -83,7 +87,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s, err := q.Select(params, mset...)
s, err, wrns := q.Select(params, mset...)
if wrns != nil {
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
federationErrors.Add(float64(len(wrns)))
}
if err != nil {
federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -92,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets)
set := storage.NewMergeSeriesSet(sets, nil)
it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6))
for set.Next() {
s := set.At()