Exemplar resize (#8974)

* Create experimental circular buffer resize method, benchmarks

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Optimize exemplar resize to only replay as many exemplars as needed

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* More comments, benchmark AddExemplar

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* optimizations

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* comment

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Slight refactor of resize benchmark + make use of resize via runtime
reloadable storage config.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Some more config related changes.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address some review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address more review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Refactor to remove usage of noopExemplarStorage and avoid race condition
when resizing from Head code.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix or add comments to clarify some of the new behaviour.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix potential panics related to negative exemplar buffer lengths

Signed-off-by: Callum Styan <callumstyan@gmail.com>

Co-authored-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Martin Disibio 2021-07-20 00:52:57 -04:00 committed by GitHub
parent a1c1313b3c
commit 1bcd13d6b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 475 additions and 132 deletions

View file

@ -131,9 +131,6 @@ type flagConfig struct {
// setFeatureListOptions sets the corresponding options from the featureList. // setFeatureListOptions sets the corresponding options from the featureList.
func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
maxExemplars := c.tsdb.MaxExemplars
// Disabled at first. Value from the flag is used if exemplar-storage is set.
c.tsdb.MaxExemplars = 0
for _, f := range c.featureList { for _, f := range c.featureList {
opts := strings.Split(f, ",") opts := strings.Split(f, ",")
for _, o := range opts { for _, o := range opts {
@ -151,8 +148,8 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
c.enableExpandExternalLabels = true c.enableExpandExternalLabels = true
level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") level.Info(logger).Log("msg", "Experimental expand-external-labels enabled")
case "exemplar-storage": case "exemplar-storage":
c.tsdb.MaxExemplars = maxExemplars c.tsdb.EnableExemplarStorage = true
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled", "maxExemplars", maxExemplars) level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled")
case "": case "":
continue continue
default: default:
@ -283,9 +280,6 @@ func main() {
a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default."). a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default.").
Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame) Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame)
a.Flag("storage.exemplars.exemplars-limit", "[EXPERIMENTAL] Maximum number of exemplars to store in in-memory exemplar storage total. 0 disables the exemplar storage. This flag is effective only with --enable-feature=exemplar-storage.").
Default("100000").IntVar(&cfg.tsdb.MaxExemplars)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").
Default("1h").SetValue(&cfg.outageTolerance) Default("1h").SetValue(&cfg.outageTolerance)
@ -352,10 +346,18 @@ func main() {
} }
// Throw error for invalid config before starting other components. // Throw error for invalid config before starting other components.
if _, err := config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil { var cfgFile *config.Config
if cfgFile, err = config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil {
level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err) level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err)
os.Exit(2) os.Exit(2)
} }
if cfg.tsdb.EnableExemplarStorage {
if cfgFile.StorageConfig.ExemplarsConfig == nil {
cfgFile.StorageConfig.ExemplarsConfig = &config.DefaultExemplarsConfig
}
cfg.tsdb.MaxExemplars = int64(cfgFile.StorageConfig.ExemplarsConfig.MaxExemplars)
}
// Now that the validity of the config is established, set the config // Now that the validity of the config is established, set the config
// success metrics accordingly, although the config isn't really loaded // success metrics accordingly, although the config isn't really loaded
// yet. This will happen later (including setting these metrics again), // yet. This will happen later (including setting these metrics again),
@ -549,6 +551,9 @@ func main() {
reloaders := []reloader{ reloaders := []reloader{
{ {
name: "db_storage",
reloader: localStorage.ApplyConfig,
}, {
name: "remote_storage", name: "remote_storage",
reloader: remoteStorage.ApplyConfig, reloader: remoteStorage.ApplyConfig,
}, { }, {
@ -750,11 +755,11 @@ func main() {
for { for {
select { select {
case <-hup: case <-hup:
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err) level.Error(logger).Log("msg", "Error reloading config", "err", err)
} }
case rc := <-webHandler.Reload(): case rc := <-webHandler.Reload():
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err) level.Error(logger).Log("msg", "Error reloading config", "err", err)
rc <- err rc <- err
} else { } else {
@ -786,7 +791,7 @@ func main() {
return nil return nil
} }
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile) return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
} }
@ -975,7 +980,7 @@ type reloader struct {
reloader func(*config.Config) error reloader func(*config.Config) error
} }
func reloadConfig(filename string, expandExternalLabels bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) { func reloadConfig(filename string, expandExternalLabels bool, enableExemplarStorage bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) {
start := time.Now() start := time.Now()
timings := []interface{}{} timings := []interface{}{}
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
@ -994,6 +999,12 @@ func reloadConfig(filename string, expandExternalLabels bool, logger log.Logger,
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename) return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
} }
if enableExemplarStorage {
if conf.StorageConfig.ExemplarsConfig == nil {
conf.StorageConfig.ExemplarsConfig = &config.DefaultExemplarsConfig
}
}
failed := false failed := false
for _, rl := range rls { for _, rl := range rls {
rstart := time.Now() rstart := time.Now()
@ -1099,6 +1110,11 @@ type readyStorage struct {
stats *tsdb.DBStats stats *tsdb.DBStats
} }
func (s *readyStorage) ApplyConfig(conf *config.Config) error {
db := s.get()
return db.ApplyConfig(conf)
}
// Set the storage. // Set the storage.
func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.mtx.Lock() s.mtx.Lock()
@ -1274,7 +1290,8 @@ type tsdbOptions struct {
StripeSize int StripeSize int
MinBlockDuration model.Duration MinBlockDuration model.Duration
MaxBlockDuration model.Duration MaxBlockDuration model.Duration
MaxExemplars int EnableExemplarStorage bool
MaxExemplars int64
} }
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1289,6 +1306,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
StripeSize: opts.StripeSize, StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars, MaxExemplars: opts.MaxExemplars,
} }
} }

View file

@ -183,6 +183,15 @@ var (
RemoteTimeout: model.Duration(1 * time.Minute), RemoteTimeout: model.Duration(1 * time.Minute),
HTTPClientConfig: config.DefaultHTTPClientConfig, HTTPClientConfig: config.DefaultHTTPClientConfig,
} }
// DefaultStorageConfig is the default TSDB/Exemplar storage configuration.
DefaultStorageConfig = StorageConfig{
ExemplarsConfig: &DefaultExemplarsConfig,
}
DefaultExemplarsConfig = ExemplarsConfig{
MaxExemplars: 100000,
}
) )
// Config is the top-level configuration for Prometheus's config files. // Config is the top-level configuration for Prometheus's config files.
@ -191,6 +200,7 @@ type Config struct {
AlertingConfig AlertingConfig `yaml:"alerting,omitempty"` AlertingConfig AlertingConfig `yaml:"alerting,omitempty"`
RuleFiles []string `yaml:"rule_files,omitempty"` RuleFiles []string `yaml:"rule_files,omitempty"`
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
StorageConfig StorageConfig `yaml:"storage,omitempty"`
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
@ -464,6 +474,18 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) {
return discovery.MarshalYAMLWithInlineConfigs(c) return discovery.MarshalYAMLWithInlineConfigs(c)
} }
// StorageConfig configures runtime reloadable configuration options.
type StorageConfig struct {
ExemplarsConfig *ExemplarsConfig `yaml:"exemplars,omitempty"`
}
// ExemplarsConfig configures runtime reloadable configuration options.
type ExemplarsConfig struct {
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// Use a value of 0 or less than 0 to disable the storage without having to restart Prometheus.
MaxExemplars int64 `yaml:"max_exemplars,omitempty"`
}
// AlertingConfig configures alerting and alertmanager related configs. // AlertingConfig configures alerting and alertmanager related configs.
type AlertingConfig struct { type AlertingConfig struct {
AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"` AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"`

View file

@ -33,6 +33,7 @@ var (
ErrOutOfOrderExemplar = errors.New("out of order exemplar") ErrOutOfOrderExemplar = errors.New("out of order exemplar")
ErrDuplicateExemplar = errors.New("duplicate exemplar") ErrDuplicateExemplar = errors.New("duplicate exemplar")
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
) )
// Appendable allows creating appenders. // Appendable allows creating appenders.

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
@ -147,9 +148,12 @@ type Options struct {
// mainly meant for external users who import TSDB. // mainly meant for external users who import TSDB.
BlocksToDelete BlocksToDeleteFunc BlocksToDelete BlocksToDeleteFunc
// Enables the in memory exemplar storage,.
EnableExemplarStorage bool
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
MaxExemplars int MaxExemplars int64
} }
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -716,7 +720,8 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.StripeSize = opts.StripeSize headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.NumExemplars = opts.MaxExemplars headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars)
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
if err != nil { if err != nil {
return nil, err return nil, err
@ -838,6 +843,10 @@ func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender(ctx)} return dbAppender{db: db, Appender: db.head.Appender(ctx)}
} }
func (db *DB) ApplyConfig(conf *config.Config) error {
return db.head.ApplyConfig(conf)
}
// dbAppender wraps the DB's head appender and triggers compactions on commit // dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary. // if necessary.
type dbAppender struct { type dbAppender struct {

View file

@ -20,21 +20,20 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
type CircularExemplarStorage struct { // Indicates that there is no index entry for an exmplar.
exemplarsAppended prometheus.Counter const noExemplar = -1
exemplarsInStorage prometheus.Gauge
seriesWithExemplarsInStorage prometheus.Gauge
lastExemplarsTs prometheus.Gauge
outOfOrderExemplars prometheus.Counter
type CircularExemplarStorage struct {
lock sync.RWMutex lock sync.RWMutex
exemplars []*circularBufferEntry exemplars []*circularBufferEntry
nextIndex int nextIndex int
metrics *ExemplarMetrics
// Map of series labels as a string to index entry, which points to the first // Map of series labels as a string to index entry, which points to the first
// and last exemplar for the series in the exemplars circular buffer. // and last exemplar for the series in the exemplars circular buffer.
@ -53,17 +52,17 @@ type circularBufferEntry struct {
ref *indexEntry ref *indexEntry
} }
// NewCircularExemplarStorage creates an circular in memory exemplar storage. type ExemplarMetrics struct {
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in exemplarsAppended prometheus.Counter
// 1GB of extra memory, accounting for the fact that this is heap allocated space. exemplarsInStorage prometheus.Gauge
// If len < 1, then the exemplar storage is disabled. seriesWithExemplarsInStorage prometheus.Gauge
func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarStorage, error) { lastExemplarsTs prometheus.Gauge
if len < 1 { maxExemplars prometheus.Gauge
return &noopExemplarStorage{}, nil outOfOrderExemplars prometheus.Counter
} }
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, len), func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
index: make(map[string]*indexEntry), m := ExemplarMetrics{
exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{ exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_exemplar_exemplars_appended_total", Name: "prometheus_tsdb_exemplar_exemplars_appended_total",
Help: "Total number of appended exemplars.", Help: "Total number of appended exemplars.",
@ -86,20 +85,51 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto
Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total",
Help: "Total number of out of order exemplar ingestion failed attempts.", Help: "Total number of out of order exemplar ingestion failed attempts.",
}), }),
maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_exemplar_max_exemplars",
Help: "Total number of exemplars the exemplar storage can store, resizeable.",
}),
} }
if reg != nil { if reg != nil {
reg.MustRegister( reg.MustRegister(
c.exemplarsAppended, m.exemplarsAppended,
c.exemplarsInStorage, m.exemplarsInStorage,
c.seriesWithExemplarsInStorage, m.seriesWithExemplarsInStorage,
c.lastExemplarsTs, m.lastExemplarsTs,
c.outOfOrderExemplars, m.outOfOrderExemplars,
m.maxExemplars,
) )
} }
return &m
}
// NewCircularExemplarStorage creates an circular in memory exemplar storage.
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be
// resized to store exemplars.
func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, error) {
if len < 0 {
len = 0
}
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, len),
index: make(map[string]*indexEntry),
metrics: m,
}
c.metrics.maxExemplars.Set(float64(len))
return c, nil return c, nil
} }
func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error {
ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
return nil
}
func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage { func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage {
return ce return ce
} }
@ -116,6 +146,10 @@ func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQ
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
ret := make([]exemplar.QueryResult, 0) ret := make([]exemplar.QueryResult, 0)
if len(ce.exemplars) <= 0 {
return ret, nil
}
ce.lock.RLock() ce.lock.RLock()
defer ce.lock.RUnlock() defer ce.lock.RUnlock()
@ -136,7 +170,7 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label
if e.exemplar.Ts >= start { if e.exemplar.Ts >= start {
se.Exemplars = append(se.Exemplars, e.exemplar) se.Exemplars = append(se.Exemplars, e.exemplar)
} }
if e.next == -1 { if e.next == noExemplar {
break break
} }
e = ce.exemplars[e.next] e = ce.exemplars[e.next]
@ -179,6 +213,10 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.
// Not thread safe. The append parameters tells us whether this is an external validation, or internal // Not thread safe. The append parameters tells us whether this is an external validation, or internal
// as a result of an AddExemplar call, in which case we should update any relevant metrics. // as a result of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error { func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error {
if len(ce.exemplars) <= 0 {
return storage.ErrExemplarsDisabled
}
// Exemplar label length does not include chars involved in text rendering such as quotes // Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength. // equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0 labelSetLen := 0
@ -204,14 +242,92 @@ func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exempla
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts { if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
if append { if append {
ce.outOfOrderExemplars.Inc() ce.metrics.outOfOrderExemplars.Inc()
} }
return storage.ErrOutOfOrderExemplar return storage.ErrOutOfOrderExemplar
} }
return nil return nil
} }
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
func (ce *CircularExemplarStorage) Resize(l int64) int {
// Accept negative values as just 0 size.
if l <= 0 {
l = 0
}
if l == int64(len(ce.exemplars)) {
return 0
}
ce.lock.Lock()
defer ce.lock.Unlock()
oldBuffer := ce.exemplars
oldNextIndex := int64(ce.nextIndex)
ce.exemplars = make([]*circularBufferEntry, l)
ce.index = make(map[string]*indexEntry)
ce.nextIndex = 0
// Replay as many entries as needed, starting with oldest first.
count := int64(len(oldBuffer))
if l < count {
count = l
}
migrated := 0
if l > 0 {
// Rewind previous next index by count with wrap-around.
// This math is essentially looking at nextIndex, where we would write the next exemplar to,
// and find the index in the old exemplar buffer that we should start migrating exemplars from.
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
var startIndex int64 = (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))
for i := int64(0); i < count; i++ {
idx := (startIndex + i) % int64(len(oldBuffer))
if entry := oldBuffer[idx]; entry != nil {
ce.migrate(entry)
migrated++
}
}
}
ce.computeMetrics()
ce.metrics.maxExemplars.Set(float64(l))
return migrated
}
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
// external lock and does not compute metrics.
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
seriesLabels := entry.ref.seriesLabels.String()
idx, ok := ce.index[seriesLabels]
if !ok {
idx = entry.ref
idx.oldest = ce.nextIndex
ce.index[seriesLabels] = idx
} else {
entry.ref = idx
ce.exemplars[idx.newest].next = ce.nextIndex
}
idx.newest = ce.nextIndex
entry.next = noExemplar
ce.exemplars[ce.nextIndex] = entry
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
}
func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
if len(ce.exemplars) <= 0 {
return storage.ErrExemplarsDisabled
}
seriesLabels := l.String() seriesLabels := l.String()
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
@ -241,7 +357,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place // There exists exemplar already on this ce.nextIndex entry, drop it, to make place
// for others. // for others.
prevLabels := prev.ref.seriesLabels.String() prevLabels := prev.ref.seriesLabels.String()
if prev.next == -1 { if prev.next == noExemplar {
// Last item for this series, remove index entry. // Last item for this series, remove index entry.
delete(ce.index, prevLabels) delete(ce.index, prevLabels)
} else { } else {
@ -251,43 +367,36 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
// since this is the first exemplar stored for this series. // since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].next = noExemplar
ce.exemplars[ce.nextIndex].exemplar = e ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].next = -1
ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels] ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels]
ce.index[seriesLabels].newest = ce.nextIndex ce.index[seriesLabels].newest = ce.nextIndex
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
ce.exemplarsAppended.Inc() ce.metrics.exemplarsAppended.Inc()
ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) ce.computeMetrics()
return nil
}
func (ce *CircularExemplarStorage) computeMetrics() {
ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index)))
if len(ce.exemplars) == 0 {
ce.metrics.exemplarsInStorage.Set(float64(0))
ce.metrics.lastExemplarsTs.Set(float64(0))
return
}
if next := ce.exemplars[ce.nextIndex]; next != nil { if next := ce.exemplars[ce.nextIndex]; next != nil {
ce.exemplarsInStorage.Set(float64(len(ce.exemplars))) ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000) ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
return nil return
} }
// We did not yet fill the buffer. // We did not yet fill the buffer.
ce.exemplarsInStorage.Set(float64(ce.nextIndex)) ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex))
ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) if ce.exemplars[0] != nil {
return nil ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
} }
type noopExemplarStorage struct{}
func (noopExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error {
return nil
}
func (noopExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
return nil
}
func (noopExemplarStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) {
return &noopExemplarQuerier{}, nil
}
type noopExemplarQuerier struct{}
func (noopExemplarQuerier) Select(_, _ int64, _ ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
return nil, nil
} }

View file

@ -14,6 +14,9 @@
package tsdb package tsdb
import ( import (
"context"
"fmt"
"math"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
@ -21,14 +24,17 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
var eMetrics = NewExemplarMetrics(prometheus.DefaultRegisterer)
// Tests the same exemplar cases as AddExemplar, but specifically the ValidateExemplar function so it can be relied on externally. // Tests the same exemplar cases as AddExemplar, but specifically the ValidateExemplar function so it can be relied on externally.
func TestValidateExemplar(t *testing.T) { func TestValidateExemplar(t *testing.T) {
exs, err := NewCircularExemplarStorage(2, nil) exs, err := NewCircularExemplarStorage(2, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -87,7 +93,7 @@ func TestValidateExemplar(t *testing.T) {
} }
func TestAddExemplar(t *testing.T) { func TestAddExemplar(t *testing.T) {
exs, err := NewCircularExemplarStorage(2, nil) exs, err := NewCircularExemplarStorage(2, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -150,7 +156,7 @@ func TestStorageOverflow(t *testing.T) {
// Test that circular buffer index and assignment // Test that circular buffer index and assignment
// works properly, adding more exemplars than can // works properly, adding more exemplars than can
// be stored and then querying for them. // be stored and then querying for them.
exs, err := NewCircularExemplarStorage(5, nil) exs, err := NewCircularExemplarStorage(5, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -185,7 +191,7 @@ func TestStorageOverflow(t *testing.T) {
} }
func TestSelectExemplar(t *testing.T) { func TestSelectExemplar(t *testing.T) {
exs, err := NewCircularExemplarStorage(5, nil) exs, err := NewCircularExemplarStorage(5, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -216,7 +222,7 @@ func TestSelectExemplar(t *testing.T) {
} }
func TestSelectExemplar_MultiSeries(t *testing.T) { func TestSelectExemplar_MultiSeries(t *testing.T) {
exs, err := NewCircularExemplarStorage(5, nil) exs, err := NewCircularExemplarStorage(5, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -273,8 +279,8 @@ func TestSelectExemplar_MultiSeries(t *testing.T) {
} }
func TestSelectExemplar_TimeRange(t *testing.T) { func TestSelectExemplar_TimeRange(t *testing.T) {
lenEs := 5 var lenEs int64 = 5
exs, err := NewCircularExemplarStorage(lenEs, nil) exs, err := NewCircularExemplarStorage(lenEs, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -282,7 +288,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) {
{Name: "service", Value: "asdf"}, {Name: "service", Value: "asdf"},
} }
for i := 0; i < lenEs; i++ { for i := 0; int64(i) < lenEs; i++ {
err := es.AddExemplar(l, exemplar.Exemplar{ err := es.AddExemplar(l, exemplar.Exemplar{
Labels: labels.Labels{ Labels: labels.Labels{
labels.Label{ labels.Label{
@ -308,7 +314,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) {
// Test to ensure that even though a series matches more than one matcher from the // Test to ensure that even though a series matches more than one matcher from the
// query that it's exemplars are only included in the result a single time. // query that it's exemplars are only included in the result a single time.
func TestSelectExemplar_DuplicateSeries(t *testing.T) { func TestSelectExemplar_DuplicateSeries(t *testing.T) {
exs, err := NewCircularExemplarStorage(4, nil) exs, err := NewCircularExemplarStorage(4, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -349,7 +355,7 @@ func TestSelectExemplar_DuplicateSeries(t *testing.T) {
} }
func TestIndexOverwrite(t *testing.T) { func TestIndexOverwrite(t *testing.T) {
exs, err := NewCircularExemplarStorage(2, nil) exs, err := NewCircularExemplarStorage(2, eMetrics)
require.NoError(t, err) require.NoError(t, err)
es := exs.(*CircularExemplarStorage) es := exs.(*CircularExemplarStorage)
@ -380,3 +386,162 @@ func TestIndexOverwrite(t *testing.T) {
i := es.index[l2.String()] i := es.index[l2.String()]
require.Equal(t, &indexEntry{0, 0, l2}, i) require.Equal(t, &indexEntry{0, 0, l2}, i)
} }
func TestResize(t *testing.T) {
testCases := []struct {
name string
startSize int64
newCount int64
expectedSeries []int
notExpectedSeries []int
expectedMigrated int
}{
{
name: "Grow",
startSize: 100,
newCount: 200,
expectedSeries: []int{99, 98, 1, 0},
notExpectedSeries: []int{100},
expectedMigrated: 100,
},
{
name: "Shrink",
startSize: 100,
newCount: 50,
expectedSeries: []int{99, 98, 50},
notExpectedSeries: []int{49, 1, 0},
expectedMigrated: 50,
},
{
name: "Zero",
startSize: 100,
newCount: 0,
expectedSeries: []int{},
notExpectedSeries: []int{},
expectedMigrated: 0,
},
{
name: "Negative",
startSize: 100,
newCount: -1,
expectedSeries: []int{},
notExpectedSeries: []int{},
expectedMigrated: 0,
},
{
name: "NegativeToNegative",
startSize: -1,
newCount: -2,
expectedSeries: []int{},
notExpectedSeries: []int{},
expectedMigrated: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
require.NoError(t, err)
es := exs.(*CircularExemplarStorage)
for i := 0; int64(i) < tc.startSize; i++ {
err = es.AddExemplar(labels.FromStrings("service", strconv.Itoa(i)), exemplar.Exemplar{
Value: float64(i),
Ts: int64(i)})
require.NoError(t, err)
}
resized := es.Resize(tc.newCount)
require.Equal(t, tc.expectedMigrated, resized)
q, err := es.Querier(context.TODO())
require.NoError(t, err)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "service", "")}
for _, expected := range tc.expectedSeries {
matchers[0].Value = strconv.Itoa(expected)
ex, err := q.Select(0, math.MaxInt64, matchers)
require.NoError(t, err)
require.NotEmpty(t, ex)
}
for _, notExpected := range tc.notExpectedSeries {
matchers[0].Value = strconv.Itoa(notExpected)
ex, err := q.Select(0, math.MaxInt64, matchers)
require.NoError(t, err)
require.Empty(t, ex)
}
})
}
}
func BenchmarkAddExemplar(t *testing.B) {
exs, err := NewCircularExemplarStorage(int64(t.N), eMetrics)
require.NoError(t, err)
es := exs.(*CircularExemplarStorage)
for i := 0; i < t.N; i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
require.NoError(t, err)
}
}
func BenchmarkResizeExemplars(b *testing.B) {
testCases := []struct {
name string
startSize int64
endSize int64
numExemplars int
}{
{
name: "grow",
startSize: 100000,
endSize: 200000,
numExemplars: 150000,
},
{
name: "shrink",
startSize: 100000,
endSize: 50000,
numExemplars: 100000,
},
{
name: "grow",
startSize: 1000000,
endSize: 2000000,
numExemplars: 1500000,
},
{
name: "shrink",
startSize: 1000000,
endSize: 500000,
numExemplars: 1000000,
},
}
for _, tc := range testCases {
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
require.NoError(b, err)
}
saveIndex := es.index
saveExemplars := es.exemplars
b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(t *testing.B) {
es.index = saveIndex
es.exemplars = saveExemplars
b.ResetTimer()
es.Resize(tc.endSize)
})
}
}

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -70,16 +71,17 @@ type Head struct {
lastWALTruncationTime atomic.Int64 lastWALTruncationTime atomic.Int64
lastSeriesID atomic.Uint64 lastSeriesID atomic.Uint64
metrics *headMetrics metrics *headMetrics
opts *HeadOptions opts *HeadOptions
wal *wal.WAL wal *wal.WAL
exemplars ExemplarStorage exemplarMetrics *ExemplarMetrics
logger log.Logger exemplars ExemplarStorage
appendPool sync.Pool logger log.Logger
exemplarsPool sync.Pool appendPool sync.Pool
seriesPool sync.Pool exemplarsPool sync.Pool
bytesPool sync.Pool seriesPool sync.Pool
memChunkPool sync.Pool bytesPool sync.Pool
memChunkPool sync.Pool
// All series addressable by their ID or hash. // All series addressable by their ID or hash.
series *stripeSeries series *stripeSeries
@ -107,6 +109,7 @@ type Head struct {
closed bool closed bool
stats *HeadStats stats *HeadStats
reg prometheus.Registerer
} }
// HeadOptions are parameters for the Head block. // HeadOptions are parameters for the Head block.
@ -119,9 +122,12 @@ type HeadOptions struct {
// StripeSize sets the number of entries in the hash map, it must be a power of 2. // StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
StripeSize int StripeSize int
SeriesCallback SeriesLifecycleCallback SeriesCallback SeriesLifecycleCallback
NumExemplars int EnableExemplarStorage bool
// Runtime reloadable options.
MaxExemplars atomic.Int64
} }
func DefaultHeadOptions() *HeadOptions { func DefaultHeadOptions() *HeadOptions {
@ -363,6 +369,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
// NewHead opens the head block in dir. // NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
var err error
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -373,7 +380,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
opts.SeriesCallback = &noopSeriesLifecycleCallback{} opts.SeriesCallback = &noopSeriesLifecycleCallback{}
} }
es, err := NewCircularExemplarStorage(opts.NumExemplars, r) em := NewExemplarMetrics(r)
es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -383,22 +391,24 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
} }
h := &Head{ h := &Head{
wal: wal, wal: wal,
logger: l, logger: l,
opts: opts, opts: opts,
exemplars: es, exemplarMetrics: em,
series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), exemplars: es,
symbols: map[string]struct{}{}, series: newStripeSeries(opts.StripeSize, opts.SeriesCallback),
postings: index.NewUnorderedMemPostings(), symbols: map[string]struct{}{},
tombstones: tombstones.NewMemTombstones(), postings: index.NewUnorderedMemPostings(),
iso: newIsolation(), tombstones: tombstones.NewMemTombstones(),
deleted: map[uint64]int{}, iso: newIsolation(),
deleted: map[uint64]int{},
memChunkPool: sync.Pool{ memChunkPool: sync.Pool{
New: func() interface{} { New: func() interface{} {
return &memChunk{} return &memChunk{}
}, },
}, },
stats: stats, stats: stats,
reg: r,
} }
h.chunkRange.Store(opts.ChunkRange) h.chunkRange.Store(opts.ChunkRange)
h.minTime.Store(math.MaxInt64) h.minTime.Store(math.MaxInt64)
@ -424,6 +434,26 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") }
func (h *Head) ApplyConfig(cfg *config.Config) error {
if !h.opts.EnableExemplarStorage {
return nil
}
// Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage
// to decide if it should pass exemplars along to it's exemplar storage, so we
// need to update opts.MaxExemplars here.
prevSize := h.opts.MaxExemplars.Load()
h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
if prevSize == h.opts.MaxExemplars.Load() {
return nil
}
migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load())
level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated)
return nil
}
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return h.exemplars.ExemplarQuerier(ctx) return h.exemplars.ExemplarQuerier(ctx)
} }
@ -1182,7 +1212,7 @@ func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64
func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
// Check if exemplar storage is enabled. // Check if exemplar storage is enabled.
if a.head.opts.NumExemplars <= 0 { if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
return 0, nil return 0, nil
} }
@ -1239,7 +1269,7 @@ func (h *Head) appender() *headAppender {
// Allocate the exemplars buffer only if exemplars are enabled. // Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf []exemplarWithSeriesRef var exemplarsBuf []exemplarWithSeriesRef
if h.opts.NumExemplars > 0 { if h.opts.EnableExemplarStorage {
exemplarsBuf = h.getExemplarBuffer() exemplarsBuf = h.getExemplarBuffer()
} }
@ -1253,7 +1283,6 @@ func (h *Head) appender() *headAppender {
exemplars: exemplarsBuf, exemplars: exemplarsBuf,
appendID: appendID, appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow, cleanupAppendIDsBelow: cleanupAppendIDsBelow,
exemplarAppender: h.exemplars,
} }
} }
@ -1270,19 +1299,6 @@ func max(a, b int64) int64 {
return b return b
} }
func (h *Head) ExemplarAppender() storage.ExemplarAppender {
h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 {
return &initAppender{
head: h,
}
}
return h.appender()
}
func (h *Head) getAppendBuffer() []record.RefSample { func (h *Head) getAppendBuffer() []record.RefSample {
b := h.appendPool.Get() b := h.appendPool.Get()
if b == nil { if b == nil {
@ -1345,10 +1361,9 @@ type exemplarWithSeriesRef struct {
} }
type headAppender struct { type headAppender struct {
head *Head head *Head
minValidTime int64 // No samples below this timestamp are allowed. minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64 mint, maxt int64
exemplarAppender ExemplarStorage
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
@ -1422,10 +1437,9 @@ func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64
// use getOrCreate or make any of the lset sanity checks that Append does. // use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) {
// Check if exemplar storage is enabled. // Check if exemplar storage is enabled.
if a.head.opts.NumExemplars <= 0 { if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
return 0, nil return 0, nil
} }
s := a.head.series.getByID(ref) s := a.head.series.getByID(ref)
if s == nil { if s == nil {
return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref)
@ -1434,9 +1448,9 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex
// Ensure no empty labels have gotten through. // Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty() e.Labels = e.Labels.WithoutEmpty()
err := a.exemplarAppender.ValidateExemplar(s.lset, e) err := a.head.exemplars.ValidateExemplar(s.lset, e)
if err != nil { if err != nil {
if err == storage.ErrDuplicateExemplar { if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled {
// Duplicate, don't return an error but don't accept the exemplar. // Duplicate, don't return an error but don't accept the exemplar.
return 0, nil return 0, nil
} }
@ -1525,7 +1539,7 @@ func (a *headAppender) Commit() (err error) {
for _, e := range a.exemplars { for _, e := range a.exemplars {
s := a.head.series.getByID(e.ref) s := a.head.series.getByID(e.ref)
// We don't instrument exemplar appends here, all is instrumented by storage. // We don't instrument exemplar appends here, all is instrumented by storage.
if err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar); err != nil { if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
if err == storage.ErrOutOfOrderExemplar { if err == storage.ErrOutOfOrderExemplar {
continue continue
} }

View file

@ -30,6 +30,7 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -51,7 +52,8 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkRange = chunkRange opts.ChunkRange = chunkRange
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.NumExemplars = 10 opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
h, err := NewHead(nil, nil, wlog, opts, nil) h, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)

View file

@ -18,6 +18,7 @@ import (
"os" "os"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -38,12 +39,14 @@ func New(t testutil.T) *TestStorage {
opts := tsdb.DefaultOptions() opts := tsdb.DefaultOptions()
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxExemplars = 10
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
} }
es, err := tsdb.NewCircularExemplarStorage(10, nil) reg := prometheus.NewRegistry()
eMetrics := tsdb.NewExemplarMetrics(reg)
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics)
if err != nil { if err != nil {
t.Fatalf("Opening test exemplar storage failed: %s", err) t.Fatalf("Opening test exemplar storage failed: %s", err)
} }