scrape,api: provide per-target metric metadata

This adds a per-target cache of scraped metadata. The metadata is only
available for the lifecycle of the attached target. An API endpoint allows
to select metadata by metric name and a label selection of targets.

Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
Fabian Reinartz 2018-05-18 03:32:11 -04:00
parent 0e7112330e
commit ad4c33c1ff
6 changed files with 371 additions and 23 deletions

View file

@ -329,7 +329,7 @@ Both the active and dropped targets are part of the response.
```json
$ curl http://localhost:9090/api/v1/targets
{
"status": "success", [3/11]
"status": "success",
"data": {
"activeTargets": [
{
@ -363,6 +363,85 @@ $ curl http://localhost:9090/api/v1/targets
}
```
## Target metadata
The following endpoint returns metadata about metrics currently scraped by targets.
```
GET /api/v1/targets/metadata
```
URL query parameters:
- `match=<series_selector>`: A selector that matches targets by label matchers. If a metric name is provided, only metadata for that metric name is returned.
- `limit=<number>`: Maximum number of targets to match.
The `data` section of the query result consists of a list of objects that
contain metric metadata and the target label set.
The following example returns all metadata entries for the `go_goroutines` metric
from the first two targets with label `job="prometheus"`.
```json
curl -G http://localhost:9091/api/v1/targets/metadata \
--data-urlencode 'match=go_goroutines{job="prometheus"}' \
--data-urlencode 'limit=2'
{
"status": "success",
"data": [
{
"target": {
"instance": "127.0.0.1:9090",
"job": "prometheus"
},
"type": "gauge",
"help": "Number of goroutines that currently exist."
},
{
"target": {
"instance": "127.0.0.1:9091",
"job": "prometheus"
},
"type": "gauge",
"help": "Number of goroutines that currently exist."
}
]
}
```
The following example returns metadata for all metrics for the target with
label `instance="127.0.0.1:9090`.
```json
curl -G http://localhost:9091/api/v1/targets/metadata \
--data-urlencode 'match={instance="127.0.0.1:9090"}'
{
"status": "success",
"data": [
// ...
{
"target": {
"instance": "127.0.0.1:9090",
"job": "prometheus"
},
"metric": "prometheus_treecache_zookeeper_failures_total",
"type": "counter",
"help": "The total number of ZooKeeper failures."
},
{
"target": {
"instance": "127.0.0.1:9090",
"job": "prometheus"
},
"metric": "prometheus_tsdb_reloads_total",
"type": "counter",
"help": "Number of times the database reloaded block data from disk."
},
// ...
]
}
```
## Alertmanagers
The following endpoint returns an overview of the current state of the

View file

@ -33,7 +33,6 @@ type Appendable interface {
// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
return &Manager{
append: app,
logger: logger,

View file

@ -161,6 +161,10 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
logger: logger,
}
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
cache := newScrapeCache()
t.setMetadataStore(cache)
return newScrapeLoop(
ctx,
s,
@ -175,6 +179,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
}
return appender(app, limit)
},
cache,
)
}
@ -523,43 +528,62 @@ type scrapeCache struct {
// Parsed string to an entry with information about the actual label set
// and its storage reference.
entries map[string]*cacheEntry
series map[string]*cacheEntry
// Cache of dropped metric strings and their iteration. The iteration must
// be a pointer so we can update it without setting a new entry with an unsafe
// string in addDropped().
dropped map[string]*uint64
droppedSeries map[string]*uint64
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
metaMtx sync.Mutex
metadata map[string]*metaEntry
}
// metaEntry holds meta information about a metric.
type metaEntry struct {
lastIter uint64 // last scrape iteration the entry was observed
typ textparse.MetricType
help string
}
func newScrapeCache() *scrapeCache {
return &scrapeCache{
entries: map[string]*cacheEntry{},
dropped: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
metadata: map[string]*metaEntry{},
}
}
func (c *scrapeCache) iterDone() {
// refCache and lsetCache may grow over time through series churn
// All caches may grow over time through series churn
// or multiple string representations of the same metric. Clean up entries
// that haven't appeared in the last scrape.
for s, e := range c.entries {
for s, e := range c.series {
if c.iter-e.lastIter > 2 {
delete(c.entries, s)
delete(c.series, s)
}
}
for s, iter := range c.dropped {
for s, iter := range c.droppedSeries {
if c.iter-*iter > 2 {
delete(c.dropped, s)
delete(c.droppedSeries, s)
}
}
c.metaMtx.Lock()
for m, e := range c.metadata {
// Keep metadata around for 10 scrapes after its metric disappeared.
if c.iter-e.lastIter > 10 {
delete(c.metadata, m)
}
}
c.metaMtx.Unlock()
// Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
@ -573,7 +597,7 @@ func (c *scrapeCache) iterDone() {
}
func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
e, ok := c.entries[met]
e, ok := c.series[met]
if !ok {
return nil, false
}
@ -585,16 +609,16 @@ func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash ui
if ref == 0 {
return
}
c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
}
func (c *scrapeCache) addDropped(met string) {
iter := c.iter
c.dropped[met] = &iter
c.droppedSeries[met] = &iter
}
func (c *scrapeCache) getDropped(met string) bool {
iterp, ok := c.dropped[met]
iterp, ok := c.droppedSeries[met]
if ok {
*iterp = c.iter
}
@ -615,6 +639,65 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
}
}
func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) {
c.metaMtx.Lock()
e, ok := c.metadata[yoloString(metric)]
if !ok {
e = &metaEntry{typ: textparse.MetricTypeUntyped}
c.metadata[string(metric)] = e
}
e.typ = t
e.lastIter = c.iter
c.metaMtx.Unlock()
}
func (c *scrapeCache) setHelp(metric, help []byte) {
c.metaMtx.Lock()
e, ok := c.metadata[yoloString(metric)]
if !ok {
e = &metaEntry{typ: textparse.MetricTypeUntyped}
c.metadata[string(metric)] = e
}
if e.help != yoloString(help) {
e.help = string(help)
}
e.lastIter = c.iter
c.metaMtx.Unlock()
}
func (c *scrapeCache) getMetadata(metric string) (MetricMetadata, bool) {
c.metaMtx.Lock()
defer c.metaMtx.Unlock()
m, ok := c.metadata[metric]
if !ok {
return MetricMetadata{}, false
}
return MetricMetadata{
Metric: metric,
Type: m.typ,
Help: m.help,
}, true
}
func (c *scrapeCache) listMetadata() (res []MetricMetadata) {
c.metaMtx.Lock()
defer c.metaMtx.Unlock()
for m, e := range c.metadata {
res = append(res, MetricMetadata{
Metric: m,
Type: e.typ,
Help: e.help,
})
}
return res
}
func newScrapeLoop(ctx context.Context,
sc scraper,
l log.Logger,
@ -622,6 +705,7 @@ func newScrapeLoop(ctx context.Context,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func() storage.Appender,
cache *scrapeCache,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
@ -629,10 +713,13 @@ func newScrapeLoop(ctx context.Context,
if buffers == nil {
buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
}
if cache == nil {
cache = newScrapeCache()
}
sl := &scrapeLoop{
scraper: sc,
buffers: buffers,
cache: newScrapeCache(),
cache: cache,
appender: appender,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
@ -838,8 +925,16 @@ loop:
}
break
}
if et != textparse.EntrySeries {
switch et {
case textparse.EntryType:
sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
sl.cache.setHelp(p.Help())
continue
case textparse.EntryComment:
continue
default:
}
total++

View file

@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/storage"
@ -306,7 +307,7 @@ func TestScrapePoolAppender(t *testing.T) {
app := &nopAppendable{}
sp := newScrapePool(cfg, app, nil)
loop := sp.newLoop(nil, nil, 0, false, nil)
loop := sp.newLoop(&Target{}, nil, 0, false, nil)
appl, ok := loop.(*scrapeLoop)
if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
@ -321,7 +322,7 @@ func TestScrapePoolAppender(t *testing.T) {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
loop = sp.newLoop(nil, nil, 100, false, nil)
loop = sp.newLoop(&Target{}, nil, 100, false, nil)
appl, ok = loop.(*scrapeLoop)
if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
@ -387,7 +388,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nil, nil,
nopMutator,
nopMutator,
nil,
nil, nil,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -450,6 +451,7 @@ func TestScrapeLoopStop(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
// Terminate loop after 2 scrapes.
@ -514,6 +516,7 @@ func TestScrapeLoopRun(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
// The loop must terminate during the initial offset if the context
@ -558,6 +561,7 @@ func TestScrapeLoopRun(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
go func() {
@ -590,6 +594,51 @@ func TestScrapeLoopRun(t *testing.T) {
}
}
func TestScrapeLoopMetadata(t *testing.T) {
var (
signal = make(chan struct{})
scraper = &testScraper{}
cache = newScrapeCache()
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return nopAppender{} },
cache,
)
defer cancel()
total, _, err := sl.append([]byte(`
# TYPE test_metric counter
# HELP test_metric some help text
# other comment
test_metric 1
# TYPE test_metric_no_help gauge
# HELP test_metric_no_type other help text`), time.Now())
testutil.Ok(t, err)
testutil.Equals(t, 1, total)
md, ok := cache.getMetadata("test_metric")
testutil.Assert(t, ok, "expected metadata to be present")
testutil.Assert(t, textparse.MetricTypeCounter == md.Type, "unexpected metric type")
testutil.Equals(t, "some help text", md.Help)
md, ok = cache.getMetadata("test_metric_no_help")
testutil.Assert(t, ok, "expected metadata to be present")
testutil.Assert(t, textparse.MetricTypeGauge == md.Type, "unexpected metric type")
testutil.Equals(t, "", md.Help)
md, ok = cache.getMetadata("test_metric_no_type")
testutil.Assert(t, ok, "expected metadata to be present")
testutil.Assert(t, textparse.MetricTypeUntyped == md.Type, "unexpected metric type")
testutil.Equals(t, "other help text", md.Help)
}
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
@ -606,6 +655,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
// Succeed once, several failures, then stop.
numScrapes := 0
@ -663,6 +713,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
// Succeed once, several failures, then stop.
@ -766,6 +817,7 @@ func TestScrapeLoopAppend(t *testing.T) {
return mutateReportSampleLabels(l, discoveryLabels)
},
func() storage.Appender { return app },
nil,
)
now := time.Now()
@ -804,6 +856,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
)
// Get the value of the Counter before performing the append.
@ -863,6 +916,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nopMutator,
nopMutator,
func() storage.Appender { return capp },
nil,
)
now := time.Now()
@ -901,6 +955,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
)
now := time.Now()
@ -945,6 +1000,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
)
now := time.Now()
@ -983,6 +1039,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1011,6 +1068,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
nopMutator,
nopMutator,
app,
nil,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1056,6 +1114,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
)
now := time.Unix(1, 0)
@ -1088,6 +1147,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
}
},
nil,
)
now := time.Now().Add(20 * time.Minute)

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/storage"
)
@ -56,6 +57,7 @@ type Target struct {
lastError error
lastScrape time.Time
health TargetHealth
metadata metricMetadataStore
}
// NewTarget creates a reasonably configured target for querying.
@ -72,6 +74,45 @@ func (t *Target) String() string {
return t.URL().String()
}
type metricMetadataStore interface {
listMetadata() []MetricMetadata
getMetadata(metric string) (MetricMetadata, bool)
}
// MetricMetadata is a piece of metadata for a metric.
type MetricMetadata struct {
Metric string
Type textparse.MetricType
Help string
}
func (t *Target) MetadataList() []MetricMetadata {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return nil
}
return t.metadata.listMetadata()
}
// Metadata returns type and help metadata for the given metric.
func (t *Target) Metadata(metric string) (MetricMetadata, bool) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.metadata == nil {
return MetricMetadata{}, false
}
return t.metadata.getMetadata(metric)
}
func (t *Target) setMetadataStore(s metricMetadataStore) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.metadata = s
}
// hash returns an identifying hash for the target.
func (t *Target) hash() uint64 {
h := fnv.New64a()

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/promql"
@ -63,6 +64,7 @@ const (
errorBadData errorType = "bad_data"
errorInternal errorType = "internal"
errorUnavailable errorType = "unavailable"
errorNotFound errorType = "not_found"
)
var corsHeaders = map[string]string{
@ -186,6 +188,7 @@ func (api *API) Register(r *route.Router) {
r.Del("/series", wrap(api.dropSeries))
r.Get("/targets", wrap(api.targets))
r.Get("/targets/metadata", wrap(api.targetMetadata))
r.Get("/alertmanagers", wrap(api.alertmanagers))
r.Get("/status/config", wrap(api.serveConfig))
@ -461,7 +464,6 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))}
for i, t := range tActive {
lastErrStr := ""
lastErr := t.LastError()
if lastErr != nil {
@ -486,6 +488,76 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) {
return res, nil, nil
}
func (api *API) targetMetadata(r *http.Request) (interface{}, *apiError) {
limit := -1
if s := r.FormValue("limit"); s != "" {
var err error
if limit, err = strconv.Atoi(s); err != nil {
return nil, &apiError{errorBadData, fmt.Errorf("limit must be a number")}
}
}
matchers, err := promql.ParseMetricSelector(r.FormValue("match"))
if err != nil {
return nil, &apiError{errorBadData, err}
}
var metric string
for i, m := range matchers {
// Extract metric matcher.
if m.Name == labels.MetricName && m.Type == labels.MatchEqual {
metric = m.Value
matchers = append(matchers[:i], matchers[i+1:]...)
break
}
}
var res []metricMetadata
Outer:
for _, t := range api.targetRetriever.TargetsActive() {
if limit >= 0 && len(res) >= limit {
break
}
for _, m := range matchers {
// Filter targets that don't satisfy the label matchers.
if !m.Matches(t.Labels().Get(m.Name)) {
continue Outer
}
}
// If no metric is specified, get the full list for the target.
if metric == "" {
for _, md := range t.MetadataList() {
res = append(res, metricMetadata{
Target: t.Labels(),
Metric: md.Metric,
Type: md.Type,
Help: md.Help,
})
}
continue
}
// Get metadata for the specified metric.
if md, ok := t.Metadata(metric); ok {
res = append(res, metricMetadata{
Target: t.Labels(),
Type: md.Type,
Help: md.Help,
})
}
}
if len(res) == 0 {
return nil, &apiError{errorNotFound, errors.New("specified metadata not found")}
}
return res, nil
}
type metricMetadata struct {
Target labels.Labels `json:"target"`
Metric string `json:"metric,omitempty"`
Type textparse.MetricType `json:"type"`
Help string `json:"help"`
}
// AlertmanagerDiscovery has all the active Alertmanagers.
type AlertmanagerDiscovery struct {
ActiveAlertmanagers []*AlertmanagerTarget `json:"activeAlertmanagers"`
@ -783,6 +855,8 @@ func respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
code = http.StatusServiceUnavailable
case errorInternal:
code = http.StatusInternalServerError
case errorNotFound:
code = http.StatusNotFound
default:
code = http.StatusInternalServerError
}