Merge pull request #783 from prometheus/fabxc/api-v1-ext

/series endpoints for API v1 and scalar range queries
This commit is contained in:
Fabian Reinartz 2015-06-11 14:06:04 +02:00
commit 70e0be6295
4 changed files with 334 additions and 47 deletions

View file

@ -279,6 +279,9 @@ func (ng *Engine) NewRangeQuery(qs string, start, end clientmodel.Timestamp, int
if err != nil {
return nil, err
}
if expr.Type() != ExprVector && expr.Type() != ExprScalar {
return nil, fmt.Errorf("invalid expression type %q for range query, must be scalar or vector", expr.Type())
}
qry := ng.newQuery(expr, start, end, interval)
qry.q = qs
@ -413,6 +416,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
evalTimer.Stop()
return val, nil
}
numSteps := int(s.End.Sub(s.Start) / s.Interval)
// Range evaluation.
sampleStreams := map[clientmodel.Fingerprint]*SampleStream{}
@ -430,26 +434,36 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
if err != nil {
return nil, err
}
vector, ok := val.(Vector)
if !ok {
return nil, fmt.Errorf("value for expression %q must be of type vector but is %s", s.Expr, val.Type())
}
for _, sample := range vector {
samplePair := metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
switch v := val.(type) {
case *Scalar:
// As the expression type does not change we can safely default to 0
// as the fingerprint for scalar expressions.
ss := sampleStreams[0]
if ss == nil {
ss = &SampleStream{Values: make(metric.Values, 0, numSteps)}
sampleStreams[0] = ss
}
fp := sample.Metric.Metric.Fingerprint()
if sampleStreams[fp] == nil {
sampleStreams[fp] = &SampleStream{
Metric: sample.Metric,
Values: metric.Values{samplePair},
ss.Values = append(ss.Values, metric.SamplePair{
Value: v.Value,
Timestamp: v.Timestamp,
})
case Vector:
for _, sample := range v {
fp := sample.Metric.Metric.Fingerprint()
ss := sampleStreams[fp]
if ss == nil {
ss = &SampleStream{Values: make(metric.Values, 0, numSteps)}
sampleStreams[fp] = ss
}
} else {
sampleStreams[fp].Values = append(sampleStreams[fp].Values, samplePair)
}
ss.Values = append(ss.Values, metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
}
default:
panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
}
}
evalTimer.Stop()

View file

@ -70,6 +70,35 @@ func ParseExpr(input string) (Expr, error) {
return expr, err
}
// ParseMetric parses the input into a metric
func ParseMetric(input string) (m clientmodel.Metric, err error) {
p := newParser(input)
defer p.recover(&err)
m = p.metric()
if p.peek().typ != itemEOF {
p.errorf("could not parse remaining input %.15q...", p.lex.input[p.lex.lastPos:])
}
return m, nil
}
// ParseMetricSelector parses the provided textual metric selector into a list of
// label matchers.
func ParseMetricSelector(input string) (m metric.LabelMatchers, err error) {
p := newParser(input)
defer p.recover(&err)
name := ""
if t := p.peek().typ; t == itemMetricIdentifier || t == itemIdentifier {
name = p.next().val
}
vs := p.vectorSelector(name)
if p.peek().typ != itemEOF {
p.errorf("could not parse remaining input %.15q...", p.lex.input[p.lex.lastPos:])
}
return vs.LabelMatchers, nil
}
// parseSeriesDesc parses the description of a time series.
func parseSeriesDesc(input string) (clientmodel.Metric, []sequenceValue, error) {
p := newParser(input)
@ -137,20 +166,7 @@ func (v sequenceValue) String() string {
func (p *parser) parseSeriesDesc() (m clientmodel.Metric, vals []sequenceValue, err error) {
defer p.recover(&err)
name := ""
m = clientmodel.Metric{}
t := p.peek().typ
if t == itemIdentifier || t == itemMetricIdentifier {
name = p.next().val
t = p.peek().typ
}
if t == itemLeftBrace {
m = clientmodel.Metric(p.labelSet())
}
if name != "" {
m[clientmodel.MetricNameLabel] = clientmodel.LabelValue(name)
}
m = p.metric()
const ctx = "series values"
for {
@ -810,6 +826,32 @@ func (p *parser) labelMatchers(operators ...itemType) metric.LabelMatchers {
return matchers
}
// metric parses a metric.
//
// <label_set>
// <metric_identifier> [<label_set>]
//
func (p *parser) metric() clientmodel.Metric {
name := ""
m := clientmodel.Metric{}
t := p.peek().typ
if t == itemIdentifier || t == itemMetricIdentifier {
name = p.next().val
t = p.peek().typ
}
if t != itemLeftBrace && name == "" {
p.errorf("missing metric name or metric selector")
}
if t == itemLeftBrace {
m = clientmodel.Metric(p.labelSet())
}
if name != "" {
m[clientmodel.MetricNameLabel] = clientmodel.LabelValue(name)
}
return m
}
// metricSelector parses a new metric selector.
//
// <metric_identifier> [<label_matchers>] [ offset <duration> ]

View file

@ -93,6 +93,9 @@ func (api *API) Register(r *route.Router) {
r.Get("/query_range", instr("query_range", api.queryRange))
r.Get("/label/:name/values", instr("label_values", api.labelValues))
r.Get("/series", instr("series", api.series))
r.Del("/series", instr("drop_series", api.dropSeries))
}
type queryData struct {
@ -112,7 +115,13 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) {
res := qry.Exec()
if res.Err != nil {
return nil, &apiError{errorBadData, res.Err}
switch res.Err.(type) {
case promql.ErrQueryCanceled:
return nil, &apiError{errorCanceled, res.Err}
case promql.ErrQueryTimeout:
return nil, &apiError{errorTimeout, res.Err}
}
return nil, &apiError{errorExec, res.Err}
}
return &queryData{
ResultType: res.Value.Type(),
@ -143,18 +152,18 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
qry, err := api.QueryEngine.NewRangeQuery(r.FormValue("query"), start, end, step)
if err != nil {
switch err.(type) {
case promql.ErrQueryCanceled:
return nil, &apiError{errorCanceled, err}
case promql.ErrQueryTimeout:
return nil, &apiError{errorTimeout, err}
}
return nil, &apiError{errorExec, err}
return nil, &apiError{errorBadData, err}
}
res := qry.Exec()
if res.Err != nil {
return nil, &apiError{errorBadData, err}
switch res.Err.(type) {
case promql.ErrQueryCanceled:
return nil, &apiError{errorCanceled, res.Err}
case promql.ErrQueryTimeout:
return nil, &apiError{errorTimeout, res.Err}
}
return nil, &apiError{errorExec, res.Err}
}
return &queryData{
ResultType: res.Value.Type(),
@ -174,6 +183,60 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
return vals, nil
}
func (api *API) series(r *http.Request) (interface{}, *apiError) {
r.ParseForm()
if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
}
fps := map[clientmodel.Fingerprint]struct{}{}
for _, lm := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(lm)
if err != nil {
return nil, &apiError{errorBadData, err}
}
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) {
fps[fp] = struct{}{}
}
}
metrics := make([]clientmodel.Metric, 0, len(fps))
for fp := range fps {
if met := api.Storage.MetricForFingerprint(fp).Metric; met != nil {
metrics = append(metrics, met)
}
}
return metrics, nil
}
func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
r.ParseForm()
if len(r.Form["match[]"]) == 0 {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
}
fps := map[clientmodel.Fingerprint]struct{}{}
for _, lm := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(lm)
if err != nil {
return nil, &apiError{errorBadData, err}
}
for _, fp := range api.Storage.FingerprintsForLabelMatchers(matchers) {
fps[fp] = struct{}{}
}
}
for fp := range fps {
api.Storage.DropMetricsForFingerprints(fp)
}
res := struct {
NumDeleted int `json:"numDeleted"`
}{
NumDeleted: len(fps),
}
return res, nil
}
func respond(w http.ResponseWriter, data interface{}) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")

View file

@ -16,6 +16,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/route"
)
@ -90,6 +91,82 @@ func TestEndpoints(t *testing.T) {
},
},
},
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"time()"},
"start": []string{"0"},
"end": []string{"2"},
"step": []string{"1"},
},
response: &queryData{
ResultType: promql.ExprMatrix,
Result: promql.Matrix{
&promql.SampleStream{
Values: metric.Values{
{Value: 0, Timestamp: start},
{Value: 1, Timestamp: start.Add(1 * time.Second)},
{Value: 2, Timestamp: start.Add(2 * time.Second)},
},
},
},
},
},
// Missing query params in range queries.
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"time()"},
"end": []string{"2"},
"step": []string{"1"},
},
errType: errorBadData,
},
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"time()"},
"start": []string{"0"},
"step": []string{"1"},
},
errType: errorBadData,
},
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"time()"},
"start": []string{"0"},
"end": []string{"2"},
},
errType: errorBadData,
},
// Missing evaluation time.
{
endpoint: api.query,
query: url.Values{
"query": []string{"0.333"},
},
errType: errorBadData,
},
// Bad query expression.
{
endpoint: api.query,
query: url.Values{
"query": []string{"invalid][query"},
"time": []string{"1970-01-01T01:02:03+01:00"},
},
errType: errorBadData,
},
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"invalid][query"},
"start": []string{"0"},
"end": []string{"100"},
"step": []string{"1"},
},
errType: errorBadData,
},
{
endpoint: api.labelValues,
params: map[string]string{
@ -100,13 +177,6 @@ func TestEndpoints(t *testing.T) {
"test_metric2",
},
},
{
endpoint: api.labelValues,
params: map[string]string{
"name": "not!!!allowed",
},
errType: errorBadData,
},
{
endpoint: api.labelValues,
params: map[string]string{
@ -117,6 +187,102 @@ func TestEndpoints(t *testing.T) {
"boo",
},
},
// Bad name parameter.
{
endpoint: api.labelValues,
params: map[string]string{
"name": "not!!!allowed",
},
errType: errorBadData,
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`test_metric2`},
},
response: []clientmodel.Metric{
{
"__name__": "test_metric2",
"foo": "boo",
},
},
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`test_metric1{foo=~"o$"}`},
},
response: []clientmodel.Metric{
{
"__name__": "test_metric1",
"foo": "boo",
},
},
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`test_metric1{foo=~"o$"}`, `test_metric1{foo=~"o$"}`},
},
response: []clientmodel.Metric{
{
"__name__": "test_metric1",
"foo": "boo",
},
},
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`test_metric1{foo=~"o$"}`, `none`},
},
response: []clientmodel.Metric{
{
"__name__": "test_metric1",
"foo": "boo",
},
},
},
// Missing match[] query params in series requests.
{
endpoint: api.series,
errType: errorBadData,
},
{
endpoint: api.dropSeries,
errType: errorBadData,
},
// The following tests delete time series from the test storage. They
// must remain at the end and are fixed in their order.
{
endpoint: api.dropSeries,
query: url.Values{
"match[]": []string{`test_metric1{foo=~"o$"}`},
},
response: struct {
NumDeleted int `json:"numDeleted"`
}{1},
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`test_metric1`},
},
response: []clientmodel.Metric{
{
"__name__": "test_metric1",
"foo": "bar",
},
},
}, {
endpoint: api.dropSeries,
query: url.Values{
"match[]": []string{`{__name__=~".*"}`},
},
response: struct {
NumDeleted int `json:"numDeleted"`
}{2},
},
}
for _, test := range tests {
@ -147,8 +313,10 @@ func TestEndpoints(t *testing.T) {
t.Fatalf("Expected error of type %q but got none", test.errType)
}
if !reflect.DeepEqual(resp, test.response) {
t.Fatalf("Response does not match, expected:\n%v\ngot:\n%v", test.response, resp)
t.Fatalf("Response does not match, expected:\n%#v\ngot:\n%#v", test.response, resp)
}
// Ensure that removed metrics are unindexed before the next request.
suite.Storage().WaitForIndexing()
}
}