From 289ba11b79a4ddc02a5566f6344dd33533dfeb2f Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 16 Mar 2021 02:47:45 -0700 Subject: [PATCH] Add circular in-memory exemplars storage (#6635) * Add circular in-memory exemplars storage Signed-off-by: Callum Styan Signed-off-by: Tom Wilkie Signed-off-by: Ganesh Vernekar Signed-off-by: Martin Disibio Co-authored-by: Ganesh Vernekar Co-authored-by: Tom Wilkie Co-authored-by: Martin Disibio * Fix some comments, clean up exemplar metrics struct and exemplar tests. Signed-off-by: Callum Styan * Fix exemplar query api null vs empty array issue. Signed-off-by: Callum Styan Co-authored-by: Ganesh Vernekar Co-authored-by: Tom Wilkie Co-authored-by: Martin Disibio --- cmd/prometheus/main.go | 26 +- docs/querying/api.md | 66 ++++++ pkg/exemplar/exemplar.go | 22 ++ pkg/textparse/openmetricsparse_test.go | 5 +- promql/parser/ast.go | 12 + promql/parser/parse_test.go | 36 +++ promql/test.go | 9 + scrape/helpers_test.go | 28 ++- scrape/scrape.go | 51 +++- scrape/scrape_test.go | 163 +++++++++++++ storage/fanout.go | 15 ++ storage/interface.go | 42 +++- storage/remote/write.go | 5 + storage/remote/write_handler_test.go | 6 + tsdb/db.go | 9 + tsdb/exemplar.go | 209 ++++++++++++++++ tsdb/exemplar_test.go | 316 +++++++++++++++++++++++++ tsdb/head.go | 137 ++++++++++- tsdb/head_test.go | 23 ++ util/teststorage/storage.go | 25 +- web/api/v1/api.go | 118 +++++++-- web/api/v1/api_test.go | 182 +++++++++++++- web/web.go | 37 +-- 23 files changed, 1478 insertions(+), 64 deletions(-) create mode 100644 tsdb/exemplar.go create mode 100644 tsdb/exemplar_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b458abf688..1a7a6b7dba 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -59,6 +59,7 @@ import ( "github.com/prometheus/prometheus/discovery" _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/logging" "github.com/prometheus/prometheus/pkg/relabel" @@ -128,6 +129,9 @@ type flagConfig struct { // setFeatureListOptions sets the corresponding options from the featureList. func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { + maxExemplars := c.tsdb.MaxExemplars + // Disabled at first. Value from the flag is used if exemplar-storage is set. + c.tsdb.MaxExemplars = 0 for _, f := range c.featureList { opts := strings.Split(f, ",") for _, o := range opts { @@ -141,6 +145,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "remote-write-receiver": c.web.RemoteWriteReceiver = true level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled") + case "exemplar-storage": + c.tsdb.MaxExemplars = maxExemplars + level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled") case "": continue default: @@ -267,6 +274,9 @@ func main() { a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default."). Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame) + a.Flag("storage.exemplars.exemplars-limit", "[EXPERIMENTAL] Maximum number of exemplars to store in in-memory exemplar storage total. 0 disables the exemplar storage. This flag is effective only with --enable-feature=exemplar-storage."). + Default("100000").IntVar(&cfg.tsdb.MaxExemplars) + a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). Default("1h").SetValue(&cfg.outageTolerance) @@ -297,7 +307,7 @@ func main() { a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier, 'remote-write-receiver' to enable remote write receiver. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier, 'remote-write-receiver' to enable remote write receiver, 'exemplar-storage' to enable the in-memory exemplar storage. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) @@ -473,6 +483,7 @@ func main() { cfg.web.TSDBDir = cfg.localStoragePath cfg.web.LocalStorage = localStorage cfg.web.Storage = fanoutStorage + cfg.web.ExemplarStorage = localStorage cfg.web.QueryEngine = queryEngine cfg.web.ScrapeManager = scrapeManager cfg.web.RuleManager = ruleManager @@ -1100,6 +1111,13 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor return nil, tsdb.ErrNotReady } +func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + if x := s.get(); x != nil { + return x.ExemplarQuerier(ctx) + } + return nil, tsdb.ErrNotReady +} + // Appender implements the Storage interface. func (s *readyStorage) Appender(ctx context.Context) storage.Appender { if x := s.get(); x != nil { @@ -1114,6 +1132,10 @@ func (n notReadyAppender) Append(ref uint64, l labels.Labels, t int64, v float64 return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } @@ -1199,6 +1221,7 @@ type tsdbOptions struct { StripeSize int MinBlockDuration model.Duration MaxBlockDuration model.Duration + MaxExemplars int } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1212,6 +1235,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { StripeSize: opts.StripeSize, MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + MaxExemplars: opts.MaxExemplars, } } diff --git a/docs/querying/api.md b/docs/querying/api.md index 1fee171eb1..22b4c87700 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -344,6 +344,72 @@ $ curl http://localhost:9090/api/v1/label/job/values } ``` +## Querying exemplars + +This is **experimental** and might change in the future. +The following endpoint returns a list of exemplars for a valid PromQL query for a specific time range: + +``` +GET /api/v1/query_exemplars +POST /api/v1/query_exemplars +``` + +URL query parameters: + +- `query=`: Prometheus expression query string. +- `start=`: Start timestamp. +- `end=`: End timestamp. + +```json +$ curl -g 'http://localhost:9090/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=020-09-14T15:23:25.479Z' +{ + "status": "success", + "data": [ + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "bar" + }, + "exemplars": [ + { + "labels": { + "traceID": "EpTxMJ40fUus7aGY" + }, + "value": "6", + "timestamp": 1600096945.479, + } + ] + }, + { + "seriesLabels": { + "__name__": "test_exemplar_metric_total", + "instance": "localhost:8090", + "job": "prometheus", + "service": "foo" + }, + "exemplars": [ + { + "labels": { + "traceID": "Olp9XHlq763ccsfa" + }, + "value": "19", + "timestamp": 1600096955.479, + }, + { + "labels": { + "traceID": "hCtjygkIHwAN9vs4" + }, + "value": "20", + "timestamp": 1600096965.489, + }, + ] + } + ] +} +``` + ## Expression query result formats Expression queries may return the following response values in the `result` diff --git a/pkg/exemplar/exemplar.go b/pkg/exemplar/exemplar.go index c6ea0db94d..8e3e01b5c9 100644 --- a/pkg/exemplar/exemplar.go +++ b/pkg/exemplar/exemplar.go @@ -22,3 +22,25 @@ type Exemplar struct { HasTs bool Ts int64 } + +type QueryResult struct { + SeriesLabels labels.Labels `json:"seriesLabels"` + Exemplars []Exemplar `json:"exemplars"` +} + +// Equals compares if the exemplar e is the same as e2. Note that if HasTs is false for +// both exemplars then the timestamps will be ignored for the comparison. This can come up +// when an exemplar is exported without it's own timestamp, in which case the scrape timestamp +// is assigned to the Ts field. However we still want to treat the same exemplar, scraped without +// an exported timestamp, as a duplicate of itself for each subsequent scrape. +func (e Exemplar) Equals(e2 Exemplar) bool { + if !labels.Equal(e.Labels, e2.Labels) { + return false + } + + if (e.HasTs || e2.HasTs) && e.Ts != e2.Ts { + return false + } + + return e.Value == e2.Value +} diff --git a/pkg/textparse/openmetricsparse_test.go b/pkg/textparse/openmetricsparse_test.go index fd4ee3ba00..39567650c4 100644 --- a/pkg/textparse/openmetricsparse_test.go +++ b/pkg/textparse/openmetricsparse_test.go @@ -230,9 +230,10 @@ foo_total 17.0 1520879607.789 # {xx="yy"} 5` var e exemplar.Exemplar p.Metric(&res) found := p.Exemplar(&e) - require.Equal(t, exp[i].m, string(m)) - require.Equal(t, exp[i].t, ts) + if e.HasTs { + require.Equal(t, exp[i].t, ts) + } require.Equal(t, exp[i].v, v) require.Equal(t, exp[i].lset, res) if exp[i].e == nil { diff --git a/promql/parser/ast.go b/promql/parser/ast.go index a8754f2405..7762425640 100644 --- a/promql/parser/ast.go +++ b/promql/parser/ast.go @@ -316,6 +316,18 @@ func Walk(v Visitor, node Node, path []Node) error { return err } +func ExtractSelectors(expr Expr) [][]*labels.Matcher { + var selectors [][]*labels.Matcher + Inspect(expr, func(node Node, _ []Node) error { + vs, ok := node.(*VectorSelector) + if ok { + selectors = append(selectors, vs.LabelMatchers) + } + return nil + }) + return selectors +} + type inspector func(Node, []Node) error func (f inspector) Visit(node Node, path []Node) (Visitor, error) { diff --git a/promql/parser/parse_test.go b/promql/parser/parse_test.go index 475f15f56a..d5dcf9ed93 100644 --- a/promql/parser/parse_test.go +++ b/promql/parser/parse_test.go @@ -3370,3 +3370,39 @@ func TestRecoverParserError(t *testing.T) { panic(e) } + +func TestExtractSelectors(t *testing.T) { + for _, tc := range [...]struct { + input string + expected []string + }{ + { + "foo", + []string{`{__name__="foo"}`}, + }, { + `foo{bar="baz"}`, + []string{`{bar="baz", __name__="foo"}`}, + }, { + `foo{bar="baz"} / flip{flop="flap"}`, + []string{`{bar="baz", __name__="foo"}`, `{flop="flap", __name__="flip"}`}, + }, { + `rate(foo[5m])`, + []string{`{__name__="foo"}`}, + }, { + `vector(1)`, + []string{}, + }, + } { + expr, err := ParseExpr(tc.input) + require.NoError(t, err) + + var expected [][]*labels.Matcher + for _, s := range tc.expected { + selector, err := ParseMetricSelector(s) + require.NoError(t, err) + expected = append(expected, selector) + } + + require.Equal(t, expected, ExtractSelectors(expr)) + } +} diff --git a/promql/test.go b/promql/test.go index 2e48f25cce..a60b408907 100644 --- a/promql/test.go +++ b/promql/test.go @@ -108,6 +108,15 @@ func (t *Test) TSDB() *tsdb.DB { return t.storage.DB } +// ExemplarStorage returns the test's exemplar storage. +func (t *Test) ExemplarStorage() storage.ExemplarStorage { + return t.storage +} + +func (t *Test) ExemplarQueryable() storage.ExemplarQueryable { + return t.storage.ExemplarQueryable() +} + func raise(line int, format string, v ...interface{}) error { return &parser.ParseErr{ LineOffset: line, diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index e82b41a613..da29d6c12c 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -15,7 +15,9 @@ package scrape import ( "context" + "math/rand" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -29,8 +31,11 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender { type nopAppender struct{} func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, error) { return 0, nil } -func (a nopAppender) Commit() error { return nil } -func (a nopAppender) Rollback() error { return nil } +func (a nopAppender) AppendExemplar(uint64, labels.Labels, exemplar.Exemplar) (uint64, error) { + return 0, nil +} +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type sample struct { metric labels.Labels @@ -45,6 +50,8 @@ type collectResultAppender struct { result []sample pendingResult []sample rolledbackResult []sample + pendingExemplars []exemplar.Exemplar + resultExemplars []exemplar.Exemplar } func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { @@ -54,21 +61,34 @@ func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v: v, }) + if ref == 0 { + ref = rand.Uint64() + } if a.next == nil { - return 0, nil + return ref, nil } ref, err := a.next.Append(ref, lset, t, v) if err != nil { return 0, err } - return ref, err } +func (a *collectResultAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + a.pendingExemplars = append(a.pendingExemplars, e) + if a.next == nil { + return 0, nil + } + + return a.next.AppendExemplar(ref, l, e) +} + func (a *collectResultAppender) Commit() error { a.result = append(a.result, a.pendingResult...) + a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) a.pendingResult = nil + a.pendingExemplars = nil if a.next == nil { return nil } diff --git a/scrape/scrape.go b/scrape/scrape.go index bddcdf0d77..67440ae788 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/pool" "github.com/prometheus/prometheus/pkg/relabel" @@ -142,19 +143,19 @@ var ( targetScrapeSampleDuplicate = prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total", - Help: "Total number of samples rejected due to duplicate timestamps but different values", + Help: "Total number of samples rejected due to duplicate timestamps but different values.", }, ) targetScrapeSampleOutOfOrder = prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_target_scrapes_sample_out_of_order_total", - Help: "Total number of samples rejected due to not being out of the expected order", + Help: "Total number of samples rejected due to not being out of the expected order.", }, ) targetScrapeSampleOutOfBounds = prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_target_scrapes_sample_out_of_bounds_total", - Help: "Total number of samples rejected due to timestamp falling outside of the time bounds", + Help: "Total number of samples rejected due to timestamp falling outside of the time bounds.", }, ) targetScrapeCacheFlushForced = prometheus.NewCounter( @@ -163,6 +164,12 @@ var ( Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.", }, ) + targetScrapeExemplarOutOfOrder = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrapes_exemplar_out_of_order_total", + Help: "Total number of exemplar rejected due to not being out of the expected order.", + }, + ) ) func init() { @@ -184,6 +191,7 @@ func init() { targetScrapePoolTargetsAdded, targetScrapeCacheFlushForced, targetMetadataCache, + targetScrapeExemplarOutOfOrder, ) } @@ -1243,9 +1251,10 @@ func (sl *scrapeLoop) getCache() *scrapeCache { } type appendErrors struct { - numOutOfOrder int - numDuplicates int - numOutOfBounds int + numOutOfOrder int + numDuplicates int + numOutOfBounds int + numExemplarOutOfOrder int } func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { @@ -1270,6 +1279,7 @@ loop: var ( et textparse.Entry sampleAdded bool + e exemplar.Exemplar ) if et, err = p.Next(); err != nil { if err == io.EOF { @@ -1360,6 +1370,18 @@ loop: // number of samples remaining after relabeling. added++ + if hasExemplar := p.Exemplar(&e); hasExemplar { + if !e.HasTs { + e.Ts = t + } + _, exemplarErr := app.AppendExemplar(ref, lset, e) + exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs) + if exemplarErr != nil { + // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors. + level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) + } + } + } if sampleLimitErr != nil { if err == nil { @@ -1377,6 +1399,9 @@ loop: if appErrs.numOutOfBounds > 0 { level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds) } + if appErrs.numExemplarOutOfOrder > 0 { + level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) + } if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. @@ -1434,6 +1459,20 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e } } +func (sl *scrapeLoop) checkAddExemplarError(err error, e exemplar.Exemplar, appErrs *appendErrors) error { + switch errors.Cause(err) { + case storage.ErrNotFound: + return storage.ErrNotFound + case storage.ErrOutOfOrderExemplar: + appErrs.numExemplarOutOfOrder++ + level.Debug(sl.l).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e)) + targetScrapeExemplarOutOfOrder.Inc() + return nil + default: + return err + } +} + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions // with scraped metrics in the cache. const ( diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 928d820da3..f21d10157c 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/pkg/textparse" @@ -1499,6 +1500,168 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { require.Equal(t, want, app.result, "Appended samples not as expected") } +func TestScrapeLoopAppendExemplar(t *testing.T) { + tests := []struct { + title string + scrapeText string + discoveryLabels []string + samples []sample + exemplars []exemplar.Exemplar + }{ + { + title: "Metric without exemplars", + scrapeText: "metric_total{n=\"1\"} 0\n# EOF", + discoveryLabels: []string{"n", "2"}, + samples: []sample{{ + metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), + v: 0, + }}, + }, + { + title: "Metric with exemplars", + scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF", + discoveryLabels: []string{"n", "2"}, + samples: []sample{{ + metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), + v: 0, + }}, + exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("a", "abc"), Value: 1}, + }, + }, { + title: "Metric with exemplars and TS", + scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF", + discoveryLabels: []string{"n", "2"}, + samples: []sample{{ + metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), + v: 0, + }}, + exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true}, + }, + }, { + title: "Two metrics and exemplars", + scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000 +metric_total{n="2"} 2 # {t="2"} 2.0 20000 +# EOF`, + samples: []sample{{ + metric: labels.FromStrings("__name__", "metric_total", "n", "1"), + v: 1, + }, { + metric: labels.FromStrings("__name__", "metric_total", "n", "2"), + v: 2, + }}, + exemplars: []exemplar.Exemplar{ + {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, + {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, + }, + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + app := &collectResultAppender{} + + discoveryLabels := &Target{ + labels: labels.FromStrings(test.discoveryLabels...), + } + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, discoveryLabels, false, nil) + }, + func(l labels.Labels) labels.Labels { + return mutateReportSampleLabels(l, discoveryLabels) + }, + func(ctx context.Context) storage.Appender { return app }, + nil, + 0, + true, + ) + + now := time.Now() + + for i := range test.samples { + test.samples[i].t = timestamp.FromTime(now) + } + + // We need to set the timestamp for expected exemplars that does not have a timestamp. + for i := range test.exemplars { + if test.exemplars[i].Ts == 0 { + test.exemplars[i].Ts = timestamp.FromTime(now) + } + } + + _, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now) + require.NoError(t, err) + require.NoError(t, app.Commit()) + require.Equal(t, test.samples, app.result) + require.Equal(t, test.exemplars, app.resultExemplars) + }) + } +} + +func TestScrapeLoopAppendExemplarSeries(t *testing.T) { + scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000 +# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000 +# EOF`} + samples := []sample{{ + metric: labels.FromStrings("__name__", "metric_total", "n", "1"), + v: 1, + }, { + metric: labels.FromStrings("__name__", "metric_total", "n", "1"), + v: 2, + }} + exemplars := []exemplar.Exemplar{ + {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, + {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, + } + discoveryLabels := &Target{ + labels: labels.FromStrings(), + } + + app := &collectResultAppender{} + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + func(l labels.Labels) labels.Labels { + return mutateSampleLabels(l, discoveryLabels, false, nil) + }, + func(l labels.Labels) labels.Labels { + return mutateReportSampleLabels(l, discoveryLabels) + }, + func(ctx context.Context) storage.Appender { return app }, + nil, + 0, + true, + ) + + now := time.Now() + + for i := range samples { + ts := now.Add(time.Second * time.Duration(i)) + samples[i].t = timestamp.FromTime(ts) + } + + // We need to set the timestamp for expected exemplars that does not have a timestamp. + for i := range exemplars { + if exemplars[i].Ts == 0 { + ts := now.Add(time.Second * time.Duration(i)) + exemplars[i].Ts = timestamp.FromTime(ts) + } + } + + for i, st := range scrapeText { + _, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + require.Equal(t, samples, app.result) + require.Equal(t, exemplars, app.resultExemplars) +} + func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( scraper = &testScraper{} diff --git a/storage/fanout.go b/storage/fanout.go index 91872b67a7..b737d63008 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" ) @@ -157,6 +158,20 @@ func (f *fanoutAppender) Append(ref uint64, l labels.Labels, t int64, v float64) return ref, nil } +func (f *fanoutAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + ref, err := f.primary.AppendExemplar(ref, l, e) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendExemplar(ref, l, e); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() diff --git a/storage/interface.go b/storage/interface.go index eba63b4ce4..f5fd19df86 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -17,6 +17,7 @@ import ( "context" "errors" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -28,6 +29,7 @@ var ( ErrOutOfOrderSample = errors.New("out of order sample") ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp") ErrOutOfBounds = errors.New("out of bounds") + ErrOutOfOrderExemplar = errors.New("out of order exemplar") ) // Appendable allows creating appenders. @@ -45,7 +47,7 @@ type SampleAndChunkQueryable interface { } // Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. Storage implements storage.SampleAppender. +// are goroutine-safe. Storage implements storage.Appender. type Storage interface { SampleAndChunkQueryable Appendable @@ -57,6 +59,13 @@ type Storage interface { Close() error } +// ExemplarStorage ingests and manages exemplars, along with various indexes. All methods are +// goroutine-safe. ExemplarStorage implements storage.ExemplarAppender and storage.ExemplarQuerier. +type ExemplarStorage interface { + ExemplarQueryable + ExemplarAppender +} + // A Queryable handles queries against a storage. // Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL. type Queryable interface { @@ -107,6 +116,18 @@ type LabelQuerier interface { Close() error } +type ExemplarQueryable interface { + // ExemplarQuerier returns a new ExemplarQuerier on the storage. + ExemplarQuerier(ctx context.Context) (ExemplarQuerier, error) +} + +// Querier provides reading access to time series data. +type ExemplarQuerier interface { + // Select all the exemplars that match the matchers. + // Within a single slice of matchers, it is an intersection. Between the slices, it is a union. + Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) +} + // SelectHints specifies hints passed for data selections. // This is used only as an option for implementation to use. type SelectHints struct { @@ -155,6 +176,25 @@ type Appender interface { // Rollback rolls back all modifications made in the appender so far. // Appender has to be discarded after rollback. Rollback() error + + ExemplarAppender +} + +// ExemplarAppender provides an interface for adding samples to exemplar storage, which +// within Prometheus is in-memory only. +type ExemplarAppender interface { + // AppendExemplar adds an exemplar for the given series labels. + // An optional reference number can be provided to accelerate calls. + // A reference number is returned which can be used to add further + // exemplars in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to Append() at any point. Adding the sample via Append() returns a new + // reference number. + // If the reference is 0 it must not be used for caching. + // Note that in our current implementation of Prometheus' exemplar storage + // calls to Append should generate the reference numbers, AppendExemplar + // generating a new reference number should be considered possible erroneous behaviour and be logged. + AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) } // SeriesSet contains a set of series. diff --git a/storage/remote/write.go b/storage/remote/write.go index 4b75eca339..a9270630fb 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/wal" @@ -222,6 +223,10 @@ func (t *timestampTracker) Append(_ uint64, _ labels.Labels, ts int64, _ float64 return 0, nil } +func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) { + return 0, nil +} + // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index a38ccd63ba..49f9643141 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -132,3 +133,8 @@ func (m *mockAppendable) Commit() error { func (*mockAppendable) Rollback() error { return fmt.Errorf("not implemented") } + +func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // noop until we implement exemplars over remote write + return 0, nil +} diff --git a/tsdb/db.go b/tsdb/db.go index a56ca7360c..8d58f5c580 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -136,6 +136,10 @@ type Options struct { // It is always the default time and size based retention in Prometheus and // mainly meant for external users who import TSDB. BlocksToDelete BlocksToDeleteFunc + + // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. + // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. + MaxExemplars int } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -663,6 +667,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback + headOpts.NumExemplars = opts.MaxExemplars db.head, err = NewHead(r, l, wlog, headOpts) if err != nil { return nil, err @@ -1506,6 +1511,10 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil } +func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + return db.head.exemplars.ExemplarQuerier(ctx) +} + func rangeForTimestamp(t int64, width int64) (maxt int64) { return (t/width)*width + width } diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go new file mode 100644 index 0000000000..5a42f30a88 --- /dev/null +++ b/tsdb/exemplar.go @@ -0,0 +1,209 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "sort" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +type CircularExemplarStorage struct { + outOfOrderExemplars prometheus.Counter + + lock sync.RWMutex + exemplars []*circularBufferEntry + nextIndex int + + // Map of series labels as a string to index entry, which points to the first + // and last exemplar for the series in the exemplars circular buffer. + index map[string]*indexEntry +} + +type indexEntry struct { + first int + last int +} + +type circularBufferEntry struct { + exemplar exemplar.Exemplar + seriesLabels labels.Labels + next int +} + +// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in +// 1GB of extra memory, accounting for the fact that this is heap allocated space. +// If len < 1, then the exemplar storage is disabled. +func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarStorage, error) { + if len < 1 { + return &noopExemplarStorage{}, nil + } + c := &CircularExemplarStorage{ + exemplars: make([]*circularBufferEntry, len), + index: make(map[string]*indexEntry), + outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", + Help: "Total number of out of order exemplar ingestion failed attempts", + }), + } + + if reg != nil { + reg.MustRegister(c.outOfOrderExemplars) + } + + return c, nil +} + +func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage { + return ce +} + +func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error) { + return ce, nil +} + +func (ce *CircularExemplarStorage) Querier(ctx context.Context) (storage.ExemplarQuerier, error) { + return ce, nil +} + +// Select returns exemplars for a given set of label matchers. +func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { + ret := make([]exemplar.QueryResult, 0) + + ce.lock.RLock() + defer ce.lock.RUnlock() + + // Loop through each index entry, which will point us to first/last exemplar for each series. + for _, idx := range ce.index { + var se exemplar.QueryResult + e := ce.exemplars[idx.first] + if !matchesSomeMatcherSet(e.seriesLabels, matchers) { + continue + } + se.SeriesLabels = e.seriesLabels + + // Loop through all exemplars in the circular buffer for the current series. + for e.exemplar.Ts <= end { + if e.exemplar.Ts >= start { + se.Exemplars = append(se.Exemplars, e.exemplar) + } + if e.next == -1 { + break + } + e = ce.exemplars[e.next] + } + if len(se.Exemplars) > 0 { + ret = append(ret, se) + } + } + + sort.Slice(ret, func(i, j int) bool { + return labels.Compare(ret[i].SeriesLabels, ret[j].SeriesLabels) < 0 + }) + + return ret, nil +} + +func matchesSomeMatcherSet(lbls labels.Labels, matchers [][]*labels.Matcher) bool { +Outer: + for _, ms := range matchers { + for _, m := range ms { + if !m.Matches(lbls.Get(m.Name)) { + continue Outer + } + } + return true + } + return false +} + +// indexGc takes the circularBufferEntry that will be overwritten and updates the +// storages index for that entries labelset if necessary. +func (ce *CircularExemplarStorage) indexGc(cbe *circularBufferEntry) { + if cbe == nil { + return + } + + l := cbe.seriesLabels.String() + i := cbe.next + if i == -1 { + delete(ce.index, l) + return + } + + ce.index[l] = &indexEntry{i, ce.index[l].last} +} + +func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { + seriesLabels := l.String() + ce.lock.Lock() + defer ce.lock.Unlock() + + idx, ok := ce.index[seriesLabels] + if !ok { + ce.indexGc(ce.exemplars[ce.nextIndex]) + // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) + // since this is the first exemplar stored for this series. + ce.exemplars[ce.nextIndex] = &circularBufferEntry{ + exemplar: e, + seriesLabels: l, + next: -1} + ce.index[seriesLabels] = &indexEntry{ce.nextIndex, ce.nextIndex} + ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) + return nil + } + + // Check for duplicate vs last stored exemplar for this series. + // NB these are expected, add appending them is a no-op. + if ce.exemplars[idx.last].exemplar.Equals(e) { + return nil + } + + if e.Ts <= ce.exemplars[idx.last].exemplar.Ts { + ce.outOfOrderExemplars.Inc() + return storage.ErrOutOfOrderExemplar + } + ce.indexGc(ce.exemplars[ce.nextIndex]) + ce.exemplars[ce.nextIndex] = &circularBufferEntry{ + exemplar: e, + seriesLabels: l, + next: -1, + } + + ce.exemplars[ce.index[seriesLabels].last].next = ce.nextIndex + ce.index[seriesLabels].last = ce.nextIndex + ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) + return nil +} + +type noopExemplarStorage struct{} + +func (noopExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { + return nil +} + +func (noopExemplarStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { + return &noopExemplarQuerier{}, nil +} + +type noopExemplarQuerier struct{} + +func (noopExemplarQuerier) Select(_, _ int64, _ ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { + return nil, nil +} diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go new file mode 100644 index 0000000000..bf76502ff2 --- /dev/null +++ b/tsdb/exemplar_test.go @@ -0,0 +1,316 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "reflect" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +func TestAddExemplar(t *testing.T) { + exs, err := NewCircularExemplarStorage(2, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + e := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "qwerty", + }, + }, + Value: 0.1, + Ts: 1, + } + + err = es.AddExemplar(l, e) + require.NoError(t, err) + require.Equal(t, es.index[l.String()].last, 0, "exemplar was not stored correctly") + + e2 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "zxcvb", + }, + }, + Value: 0.1, + Ts: 2, + } + + err = es.AddExemplar(l, e2) + require.NoError(t, err) + require.Equal(t, es.index[l.String()].last, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") + require.True(t, es.exemplars[es.index[l.String()].last].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].last].exemplar) + + err = es.AddExemplar(l, e2) + require.NoError(t, err, "no error is expected attempting to add duplicate exemplar") + + e3 := e2 + e3.Ts = 3 + err = es.AddExemplar(l, e3) + require.NoError(t, err, "no error is expected when attempting to add duplicate exemplar, even with different timestamp") + + e3.Ts = 1 + e3.Value = 0.3 + err = es.AddExemplar(l, e3) + require.Equal(t, err, storage.ErrOutOfOrderExemplar) +} + +func TestStorageOverflow(t *testing.T) { + // Test that circular buffer index and assignment + // works properly, adding more exemplars than can + // be stored and then querying for them. + exs, err := NewCircularExemplarStorage(5, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + + var eList []exemplar.Exemplar + for i := 0; i < len(es.exemplars)+1; i++ { + e := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "a", + }, + }, + Value: float64(i+1) / 10, + Ts: int64(101 + i), + } + es.AddExemplar(l, e) + eList = append(eList, e) + } + require.True(t, (es.exemplars[0].exemplar.Ts == 106), "exemplar was not stored correctly") + + m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value) + require.NoError(t, err, "error creating label matcher for exemplar query") + ret, err := es.Select(100, 110, []*labels.Matcher{m}) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") + + require.True(t, reflect.DeepEqual(eList[1:], ret[0].Exemplars), "select did not return expected exemplars\n\texpected: %+v\n\tactual: %+v\n", eList[1:], ret[0].Exemplars) +} + +func TestSelectExemplar(t *testing.T) { + exs, err := NewCircularExemplarStorage(5, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + e := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "qwerty", + }, + }, + Value: 0.1, + Ts: 12, + } + + err = es.AddExemplar(l, e) + require.NoError(t, err, "adding exemplar failed") + require.True(t, reflect.DeepEqual(es.exemplars[0].exemplar, e), "exemplar was not stored correctly") + + m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value) + require.NoError(t, err, "error creating label matcher for exemplar query") + ret, err := es.Select(0, 100, []*labels.Matcher{m}) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") + + expectedResult := []exemplar.Exemplar{e} + require.True(t, reflect.DeepEqual(expectedResult, ret[0].Exemplars), "select did not return expected exemplars\n\texpected: %+v\n\tactual: %+v\n", expectedResult, ret[0].Exemplars) +} + +func TestSelectExemplar_MultiSeries(t *testing.T) { + exs, err := NewCircularExemplarStorage(5, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l1 := labels.Labels{ + {Name: "__name__", Value: "test_metric"}, + {Name: "service", Value: "asdf"}, + } + l2 := labels.Labels{ + {Name: "__name__", Value: "test_metric2"}, + {Name: "service", Value: "qwer"}, + } + + for i := 0; i < len(es.exemplars); i++ { + e1 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "a", + }, + }, + Value: float64(i+1) / 10, + Ts: int64(101 + i), + } + err = es.AddExemplar(l1, e1) + require.NoError(t, err) + + e2 := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "b", + }, + }, + Value: float64(i+1) / 10, + Ts: int64(101 + i), + } + err = es.AddExemplar(l2, e2) + require.NoError(t, err) + } + + m, err := labels.NewMatcher(labels.MatchEqual, l2[0].Name, l2[0].Value) + require.NoError(t, err, "error creating label matcher for exemplar query") + ret, err := es.Select(100, 200, []*labels.Matcher{m}) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") + require.True(t, len(ret[0].Exemplars) == 3, "didn't get expected 8 exemplars, got %d", len(ret[0].Exemplars)) + + m, err = labels.NewMatcher(labels.MatchEqual, l1[0].Name, l1[0].Value) + require.NoError(t, err, "error creating label matcher for exemplar query") + ret, err = es.Select(100, 200, []*labels.Matcher{m}) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") + require.True(t, len(ret[0].Exemplars) == 2, "didn't get expected 8 exemplars, got %d", len(ret[0].Exemplars)) +} + +func TestSelectExemplar_TimeRange(t *testing.T) { + lenEs := 5 + exs, err := NewCircularExemplarStorage(lenEs, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + + for i := 0; i < lenEs; i++ { + err := es.AddExemplar(l, exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: strconv.Itoa(i), + }, + }, + Value: 0.1, + Ts: int64(101 + i), + }) + require.NoError(t, err) + require.Equal(t, es.index[l.String()].last, i, "exemplar was not stored correctly") + } + + m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value) + require.NoError(t, err, "error creating label matcher for exemplar query") + ret, err := es.Select(102, 104, []*labels.Matcher{m}) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") + require.True(t, len(ret[0].Exemplars) == 3, "didn't get expected two exemplars %d, %+v", len(ret[0].Exemplars), ret) +} + +// Test to ensure that even though a series matches more than one matcher from the +// query that it's exemplars are only included in the result a single time. +func TestSelectExemplar_DuplicateSeries(t *testing.T) { + exs, err := NewCircularExemplarStorage(4, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + e := exemplar.Exemplar{ + Labels: labels.Labels{ + labels.Label{ + Name: "traceID", + Value: "qwerty", + }, + }, + Value: 0.1, + Ts: 12, + } + + l := labels.Labels{ + {Name: "service", Value: "asdf"}, + {Name: "cluster", Value: "us-central1"}, + } + + // Lets just assume somehow the PromQL expression generated two separate lists of matchers, + // both of which can select this particular series. + m := [][]*labels.Matcher{ + { + labels.MustNewMatcher(labels.MatchEqual, l[0].Name, l[0].Value), + }, + { + labels.MustNewMatcher(labels.MatchEqual, l[1].Name, l[1].Value), + }, + } + + err = es.AddExemplar(l, e) + require.NoError(t, err, "adding exemplar failed") + require.True(t, reflect.DeepEqual(es.exemplars[0].exemplar, e), "exemplar was not stored correctly") + + ret, err := es.Select(0, 100, m...) + require.NoError(t, err) + require.True(t, len(ret) == 1, "select should have returned samples for a single series only") +} + +func TestIndexOverwrite(t *testing.T) { + exs, err := NewCircularExemplarStorage(2, nil) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + l1 := labels.Labels{ + {Name: "service", Value: "asdf"}, + } + + l2 := labels.Labels{ + {Name: "service", Value: "qwer"}, + } + + err = es.AddExemplar(l1, exemplar.Exemplar{Value: 1, Ts: 1}) + require.NoError(t, err) + err = es.AddExemplar(l2, exemplar.Exemplar{Value: 2, Ts: 2}) + require.NoError(t, err) + err = es.AddExemplar(l2, exemplar.Exemplar{Value: 3, Ts: 3}) + require.NoError(t, err) + + // Ensure index GC'ing is taking place, there should no longer be any + // index entry for series l1 since we just wrote two exemplars for series l2. + _, ok := es.index[l1.String()] + require.False(t, ok) + require.Equal(t, &indexEntry{1, 0}, es.index[l2.String()]) + + err = es.AddExemplar(l1, exemplar.Exemplar{Value: 4, Ts: 4}) + require.NoError(t, err) + + i := es.index[l2.String()] + require.Equal(t, &indexEntry{0, 0}, i) +} diff --git a/tsdb/head.go b/tsdb/head.go index 31e644da28..edf5600812 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -46,11 +47,19 @@ var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") + // ErrInvalidExemplar is returned if an appended exemplar is not valid and can't + // be ingested. + ErrInvalidExemplar = errors.New("invalid exemplar") // ErrAppenderClosed is returned if an appender has already be successfully // rolled back or committed. ErrAppenderClosed = errors.New("appender closed") ) +type ExemplarStorage interface { + storage.ExemplarQueryable + AddExemplar(labels.Labels, exemplar.Exemplar) error +} + // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -60,14 +69,16 @@ type Head struct { lastWALTruncationTime atomic.Int64 lastSeriesID atomic.Uint64 - metrics *headMetrics - opts *HeadOptions - wal *wal.WAL - logger log.Logger - appendPool sync.Pool - seriesPool sync.Pool - bytesPool sync.Pool - memChunkPool sync.Pool + metrics *headMetrics + opts *HeadOptions + wal *wal.WAL + exemplars ExemplarStorage + logger log.Logger + appendPool sync.Pool + exemplarsPool sync.Pool + seriesPool sync.Pool + bytesPool sync.Pool + memChunkPool sync.Pool // All series addressable by their ID or hash. series *stripeSeries @@ -107,6 +118,7 @@ type HeadOptions struct { // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. StripeSize int SeriesCallback SeriesLifecycleCallback + NumExemplars int } func DefaultHeadOptions() *HeadOptions { @@ -133,6 +145,7 @@ type headMetrics struct { samplesAppended prometheus.Counter outOfBoundSamples prometheus.Counter outOfOrderSamples prometheus.Counter + outOfOrderExemplars prometheus.Counter walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter walTotalReplayDuration prometheus.Gauge @@ -209,6 +222,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_out_of_order_samples_total", Help: "Total number of out of order samples ingestion failed attempts.", }), + outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_out_of_order_exemplars_total", + Help: "Total number of out of order exemplars ingestion failed attempts.", + }), headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_truncations_failed_total", Help: "Total number of head truncations that failed.", @@ -256,6 +273,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.samplesAppended, m.outOfBoundSamples, m.outOfOrderSamples, + m.outOfOrderExemplars, m.headTruncateFail, m.headTruncateTotal, m.checkpointDeleteFail, @@ -325,10 +343,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti if opts.SeriesCallback == nil { opts.SeriesCallback = &noopSeriesLifecycleCallback{} } + + es, err := NewCircularExemplarStorage(opts.NumExemplars, r) + if err != nil { + return nil, err + } + h := &Head{ wal: wal, logger: l, opts: opts, + exemplars: es, series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), @@ -351,7 +376,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.ChunkPool = chunkenc.NewPool() } - var err error h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( mmappedChunksDir(opts.ChunkDirRoot), opts.ChunkPool, @@ -366,6 +390,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } +func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + return h.exemplars.ExemplarQuerier(ctx) +} + // processWALSamples adds a partition of samples it receives to the head and passes // them on to other workers. // Samples before the mint timestamp are discarded. @@ -1062,6 +1090,23 @@ func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64 return a.app.Append(ref, lset, t, v) } +func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // Check if exemplar storage is enabled. + if a.head.opts.NumExemplars == 0 { + return 0, nil + } + + if a.app != nil { + return a.app.AppendExemplar(ref, l, e) + } + // We should never reach here given we would call Append before AppendExemplar + // and we probably want to always base head/WAL min time on sample times. + a.head.initTime(e.Ts) + a.app = a.head.appender() + + return a.app.AppendExemplar(ref, l, e) +} + func (a *initAppender) Commit() error { if a.app == nil { return nil @@ -1101,8 +1146,10 @@ func (h *Head) appender() *headAppender { maxt: math.MinInt64, samples: h.getAppendBuffer(), sampleSeries: h.getSeriesBuffer(), + exemplars: h.getExemplarBuffer(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, + exemplarAppender: h.exemplars, } } @@ -1119,6 +1166,19 @@ func max(a, b int64) int64 { return b } +func (h *Head) ExemplarAppender() storage.ExemplarAppender { + h.metrics.activeAppenders.Inc() + + // The head cache might not have a starting point yet. The init appender + // picks up the first appended timestamp as the base. + if h.MinTime() == math.MaxInt64 { + return &initAppender{ + head: h, + } + } + return h.appender() +} + func (h *Head) getAppendBuffer() []record.RefSample { b := h.appendPool.Get() if b == nil { @@ -1132,6 +1192,19 @@ func (h *Head) putAppendBuffer(b []record.RefSample) { h.appendPool.Put(b[:0]) } +func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { + b := h.exemplarsPool.Get() + if b == nil { + return make([]exemplarWithSeriesRef, 0, 512) + } + return b.([]exemplarWithSeriesRef) +} + +func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { + //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.exemplarsPool.Put(b[:0]) +} + func (h *Head) getSeriesBuffer() []*memSeries { b := h.seriesPool.Get() if b == nil { @@ -1158,13 +1231,20 @@ func (h *Head) putBytesBuffer(b []byte) { h.bytesPool.Put(b[:0]) } +type exemplarWithSeriesRef struct { + ref uint64 + exemplar exemplar.Exemplar +} + type headAppender struct { - head *Head - minValidTime int64 // No samples below this timestamp are allowed. - mint, maxt int64 + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 + exemplarAppender ExemplarStorage series []record.RefSeries samples []record.RefSample + exemplars []exemplarWithSeriesRef sampleSeries []*memSeries appendID, cleanupAppendIDsBelow uint64 @@ -1230,6 +1310,27 @@ func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64 return s.ref, nil } +// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't +// use getOrCreate or make any of the lset sanity checks that Append does. +func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { + // Check if exemplar storage is enabled. + if a.head.opts.NumExemplars == 0 { + return 0, nil + } + + s := a.head.series.getByID(ref) + if s == nil { + return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) + } + + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() + + a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) + + return s.ref, nil +} + func (a *headAppender) log() error { if a.head.wal == nil { return nil @@ -1271,9 +1372,21 @@ func (a *headAppender) Commit() (err error) { return errors.Wrap(err, "write to WAL") } + // No errors logging to WAL, so pass the exemplars along to the in memory storage. + for _, e := range a.exemplars { + s := a.head.series.getByID(e.ref) + err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar) + if err == storage.ErrOutOfOrderExemplar { + a.head.metrics.outOfOrderExemplars.Inc() + } else if err != nil { + level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) + } + } + defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) + defer a.head.putExemplarBuffer(a.exemplars) defer a.head.iso.closeAppend(a.appendID) total := len(a.samples) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d47a487906..5e1d73c2f7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -30,6 +30,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -2014,6 +2015,28 @@ func TestHeadMintAfterTruncation(t *testing.T) { require.NoError(t, head.Close()) } +func TestHeadExemplars(t *testing.T) { + chunkRange := int64(2000) + head, _ := newTestHead(t, chunkRange, false) + app := head.Appender(context.Background()) + + l := labels.FromStrings("traceId", "123") + // It is perfectly valid to add Exemplars before the current start time - + // histogram buckets that haven't been update in a while could still be + // exported exemplars from an hour ago. + ref, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) + require.NoError(t, err) + _, err = app.AppendExemplar(ref, l, exemplar.Exemplar{ + Labels: l, + HasTs: true, + Ts: -1000, + Value: 1, + }) + require.NoError(t, err) + require.NoError(t, app.Commit()) + require.NoError(t, head.Close()) +} + func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { chunkRange := int64(2000) head, _ := newTestHead(b, chunkRange, false) diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 6fa90781e1..16b3c3c7c6 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -18,6 +18,9 @@ import ( "os" "time" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/testutil" ) @@ -35,16 +38,22 @@ func New(t testutil.T) *TestStorage { opts := tsdb.DefaultOptions() opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) + opts.MaxExemplars = 10 db, err := tsdb.Open(dir, nil, nil, opts) if err != nil { t.Fatalf("Opening test storage failed: %s", err) } - return &TestStorage{DB: db, dir: dir} + es, err := tsdb.NewCircularExemplarStorage(10, nil) + if err != nil { + t.Fatalf("Opening test exemplar storage failed: %s", err) + } + return &TestStorage{DB: db, exemplarStorage: es, dir: dir} } type TestStorage struct { *tsdb.DB - dir string + exemplarStorage tsdb.ExemplarStorage + dir string } func (s TestStorage) Close() error { @@ -53,3 +62,15 @@ func (s TestStorage) Close() error { } return os.RemoveAll(s.dir) } + +func (s TestStorage) ExemplarAppender() storage.ExemplarAppender { + return s +} + +func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable { + return s.exemplarStorage +} + +func (s TestStorage) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + return ref, s.exemplarStorage.AddExemplar(l, e) +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 9430762cc4..3483c36456 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" @@ -158,8 +159,9 @@ type TSDBAdminStats interface { // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { - Queryable storage.SampleAndChunkQueryable - QueryEngine *promql.Engine + Queryable storage.SampleAndChunkQueryable + QueryEngine *promql.Engine + ExemplarQueryable storage.ExemplarQueryable targetRetriever func(context.Context) TargetRetriever alertmanagerRetriever func(context.Context) AlertmanagerRetriever @@ -185,6 +187,7 @@ type API struct { func init() { jsoniter.RegisterTypeEncoderFunc("promql.Point", marshalPointJSON, marshalPointJSONIsEmpty) + jsoniter.RegisterTypeEncoderFunc("exemplar.Exemplar", marshalExemplarJSON, marshalExemplarJSONEmpty) } // NewAPI returns an initialized API type. @@ -192,6 +195,7 @@ func NewAPI( qe *promql.Engine, q storage.SampleAndChunkQueryable, ap storage.Appendable, + eq storage.ExemplarQueryable, tr func(context.Context) TargetRetriever, ar func(context.Context) AlertmanagerRetriever, configFunc func() config.Config, @@ -213,8 +217,9 @@ func NewAPI( registerer prometheus.Registerer, ) *API { a := &API{ - QueryEngine: qe, - Queryable: q, + QueryEngine: qe, + Queryable: q, + ExemplarQueryable: eq, targetRetriever: tr, alertmanagerRetriever: ar, @@ -282,6 +287,8 @@ func (api *API) Register(r *route.Router) { r.Post("/query", wrap(api.query)) r.Get("/query_range", wrap(api.queryRange)) r.Post("/query_range", wrap(api.queryRange)) + r.Get("/query_exemplars", wrap(api.queryExemplars)) + r.Post("/query_exemplars", wrap(api.queryExemplars)) r.Get("/labels", wrap(api.labelNames)) r.Post("/labels", wrap(api.labelNames)) @@ -469,6 +476,44 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { }, nil, res.Warnings, qry.Close} } +func (api *API) queryExemplars(r *http.Request) apiFuncResult { + start, err := parseTimeParam(r, "start", minTime) + if err != nil { + return invalidParamError(err, "start") + } + end, err := parseTimeParam(r, "end", maxTime) + if err != nil { + return invalidParamError(err, "end") + } + if end.Before(start) { + err := errors.New("end timestamp must not be before start timestamp") + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} + } + + expr, err := parser.ParseExpr(r.FormValue("query")) + if err != nil { + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} + } + + selectors := parser.ExtractSelectors(expr) + if len(selectors) < 1 { + return apiFuncResult{nil, nil, nil, nil} + } + + ctx := r.Context() + eq, err := api.ExemplarQueryable.ExemplarQuerier(ctx) + if err != nil { + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + } + + res, err := eq.Select(timestamp.FromTime(start), timestamp.FromTime(end), selectors...) + if err != nil { + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + } + + return apiFuncResult{res, nil, nil, nil} +} + func returnAPIError(err error) *apiError { if err == nil { return nil @@ -1535,12 +1580,60 @@ OUTER: return matcherSets, nil } +// marshalPointJSON writes `[ts, "val"]`. func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { p := *((*promql.Point)(ptr)) stream.WriteArrayStart() + marshalTimestamp(p.T, stream) + stream.WriteMore() + marshalValue(p.V, stream) + stream.WriteArrayEnd() +} + +func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool { + return false +} + +// marshalExemplarJSON writes. +// { +// labels: , +// value: "", +// timestamp: +// } +func marshalExemplarJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { + p := *((*exemplar.Exemplar)(ptr)) + stream.WriteObjectStart() + + // "labels" key. + stream.WriteObjectField(`labels`) + lbls, err := p.Labels.MarshalJSON() + if err != nil { + stream.Error = err + return + } + stream.SetBuffer(append(stream.Buffer(), lbls...)) + + // "value" key. + stream.WriteMore() + stream.WriteObjectField(`value`) + marshalValue(p.Value, stream) + + // "timestamp" key. + stream.WriteMore() + stream.WriteObjectField(`timestamp`) + marshalTimestamp(p.Ts, stream) + //marshalTimestamp(p.Ts, stream) + + stream.WriteObjectEnd() +} + +func marshalExemplarJSONEmpty(ptr unsafe.Pointer) bool { + return false +} + +func marshalTimestamp(t int64, stream *jsoniter.Stream) { // Write out the timestamp as a float divided by 1000. // This is ~3x faster than converting to a float. - t := p.T if t < 0 { stream.WriteRaw(`-`) t = -t @@ -1557,13 +1650,14 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { } stream.WriteInt64(fraction) } - stream.WriteMore() - stream.WriteRaw(`"`) +} +func marshalValue(v float64, stream *jsoniter.Stream) { + stream.WriteRaw(`"`) // Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround // to https://github.com/json-iterator/go/issues/365 (jsoniter, to follow json standard, doesn't allow inf/nan). buf := stream.Buffer() - abs := math.Abs(p.V) + abs := math.Abs(v) fmt := byte('f') // Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right. if abs != 0 { @@ -1571,13 +1665,7 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { fmt = 'e' } } - buf = strconv.AppendFloat(buf, p.V, fmt, -1, 64) + buf = strconv.AppendFloat(buf, v, fmt, -1, 64) stream.SetBuffer(buf) - stream.WriteRaw(`"`) - stream.WriteArrayEnd() -} - -func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool { - return false } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 42819236fa..691f6539f0 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -40,6 +40,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" @@ -304,6 +305,55 @@ func TestEndpoints(t *testing.T) { test_metric4{foo="boo", dup="1"} 1+0x100 test_metric4{foo="boo"} 1+0x100 `) + + start := time.Unix(0, 0) + exemplars := []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "test_metric3", "foo", "boo", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "abc"), + Value: 10, + Ts: timestamp.FromTime(start.Add(2 * time.Second)), + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "test_metric4", "foo", "bar", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "lul"), + Value: 10, + Ts: timestamp.FromTime(start.Add(4 * time.Second)), + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "test_metric3", "foo", "boo", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "abc2"), + Value: 10, + Ts: timestamp.FromTime(start.Add(4053 * time.Millisecond)), + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "test_metric4", "foo", "bar", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "lul2"), + Value: 10, + Ts: timestamp.FromTime(start.Add(4153 * time.Millisecond)), + }, + }, + }, + } + for _, ed := range exemplars { + suite.ExemplarStorage().AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0]) + require.NoError(t, err, "failed to add exemplar: %+v", ed.Exemplars[0]) + } + require.NoError(t, err) defer suite.Close() @@ -324,6 +374,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: suite.Storage(), QueryEngine: suite.QueryEngine(), + ExemplarQueryable: suite.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), flagsMap: sampleFlagMap, @@ -332,8 +383,7 @@ func TestEndpoints(t *testing.T) { ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, rulesRetriever: algr.toFactory(), } - - testEndpoints(t, api, testTargetRetriever, true) + testEndpoints(t, api, testTargetRetriever, suite.ExemplarStorage(), true) }) // Run all the API tests against a API that is wired to forward queries via @@ -388,6 +438,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: remote, QueryEngine: suite.QueryEngine(), + ExemplarQueryable: suite.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), flagsMap: sampleFlagMap, @@ -397,7 +448,7 @@ func TestEndpoints(t *testing.T) { rulesRetriever: algr.toFactory(), } - testEndpoints(t, api, testTargetRetriever, false) + testEndpoints(t, api, testTargetRetriever, suite.ExemplarStorage(), false) }) } @@ -535,7 +586,7 @@ func setupRemote(s storage.Storage) *httptest.Server { return httptest.NewServer(handler) } -func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI bool) { +func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.ExemplarStorage, testLabelAPI bool) { start := time.Unix(0, 0) type targetMetadata struct { @@ -552,6 +603,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI errType errorType sorter func(interface{}) metadata []targetMetadata + exemplars []exemplar.QueryResult } var tests = []test{ @@ -1458,6 +1510,89 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI }, }, }, + { + endpoint: api.queryExemplars, + query: url.Values{ + "query": []string{`test_metric3{foo="boo"} - test_metric4{foo="bar"}`}, + "start": []string{"0"}, + "end": []string{"4"}, + }, + // Note extra integer length of timestamps for exemplars because of millisecond preservation + // of timestamps within Prometheus (see timestamp package). + + response: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "test_metric3", "foo", "boo", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "abc"), + Value: 10, + Ts: timestamp.FromTime(start.Add(2 * time.Second)), + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "test_metric4", "foo", "bar", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "lul"), + Value: 10, + Ts: timestamp.FromTime(start.Add(4 * time.Second)), + }, + }, + }, + }, + }, + { + endpoint: api.queryExemplars, + query: url.Values{ + "query": []string{`{foo="boo"}`}, + "start": []string{"4"}, + "end": []string{"4.1"}, + }, + response: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "test_metric3", "foo", "boo", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "abc2"), + Value: 10, + Ts: 4053, + }, + }, + }, + }, + }, + { + endpoint: api.queryExemplars, + query: url.Values{ + "query": []string{`{foo="boo"}`}, + }, + response: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "test_metric3", "foo", "boo", "dup", "1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("id", "abc"), + Value: 10, + Ts: 2000, + }, + { + Labels: labels.FromStrings("id", "abc2"), + Value: 10, + Ts: 4053, + }, + }, + }, + }, + }, + { + endpoint: api.queryExemplars, + query: url.Values{ + "query": []string{`{__name__="test_metric5"}`}, + }, + response: []exemplar.QueryResult{}, + }, } if testLabelAPI { @@ -1890,6 +2025,15 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata}) } + for _, te := range test.exemplars { + for _, e := range te.Exemplars { + _, err := es.AppendExemplar(0, te.SeriesLabels, e) + if err != nil { + t.Fatal(err) + } + } + } + res := test.endpoint(req.WithContext(ctx)) assertAPIError(t, res.err, test.errType) @@ -2509,6 +2653,36 @@ func TestRespond(t *testing.T) { response: promql.Point{V: 1.2345678e-67, T: 0}, expected: `{"status":"success","data":[0,"1.2345678e-67"]}`, }, + { + response: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("foo", "bar"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "abc"), + Value: 100.123, + Ts: 1234, + }, + }, + }, + }, + expected: `{"status":"success","data":[{"seriesLabels":{"foo":"bar"},"exemplars":[{"labels":{"traceID":"abc"},"value":"100.123","timestamp":1.234}]}]}`, + }, + { + response: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("foo", "bar"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "abc"), + Value: math.Inf(1), + Ts: 1234, + }, + }, + }, + }, + expected: `{"status":"success","data":[{"seriesLabels":{"foo":"bar"},"exemplars":[{"labels":{"traceID":"abc"},"value":"+Inf","timestamp":1.234}]}]}`, + }, } for _, c := range cases { diff --git a/web/web.go b/web/web.go index 185aeedb75..bdc5c44e5c 100644 --- a/web/web.go +++ b/web/web.go @@ -174,14 +174,15 @@ type Handler struct { gatherer prometheus.Gatherer metrics *metrics - scrapeManager *scrape.Manager - ruleManager *rules.Manager - queryEngine *promql.Engine - lookbackDelta time.Duration - context context.Context - storage storage.Storage - localStorage LocalStorage - notifier *notifier.Manager + scrapeManager *scrape.Manager + ruleManager *rules.Manager + queryEngine *promql.Engine + lookbackDelta time.Duration + context context.Context + storage storage.Storage + localStorage LocalStorage + exemplarStorage storage.ExemplarQueryable + notifier *notifier.Manager apiV1 *api_v1.API @@ -220,6 +221,7 @@ type Options struct { TSDBMaxBytes units.Base2Bytes LocalStorage LocalStorage Storage storage.Storage + ExemplarStorage storage.ExemplarQueryable QueryEngine *promql.Engine LookbackDelta time.Duration ScrapeManager *scrape.Manager @@ -281,14 +283,15 @@ func New(logger log.Logger, o *Options) *Handler { cwd: cwd, flagsMap: o.Flags, - context: o.Context, - scrapeManager: o.ScrapeManager, - ruleManager: o.RuleManager, - queryEngine: o.QueryEngine, - lookbackDelta: o.LookbackDelta, - storage: o.Storage, - localStorage: o.LocalStorage, - notifier: o.Notifier, + context: o.Context, + scrapeManager: o.ScrapeManager, + ruleManager: o.RuleManager, + queryEngine: o.QueryEngine, + lookbackDelta: o.LookbackDelta, + storage: o.Storage, + localStorage: o.LocalStorage, + exemplarStorage: o.ExemplarStorage, + notifier: o.Notifier, now: model.Now, } @@ -303,7 +306,7 @@ func New(logger log.Logger, o *Options) *Handler { app = h.storage } - h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, factoryTr, factoryAr, + h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, h.exemplarStorage, factoryTr, factoryAr, func() config.Config { h.mtx.RLock() defer h.mtx.RUnlock()