diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a017db39b2..f826cc08e3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -131,9 +131,6 @@ type flagConfig struct { // setFeatureListOptions sets the corresponding options from the featureList. func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { - maxExemplars := c.tsdb.MaxExemplars - // Disabled at first. Value from the flag is used if exemplar-storage is set. - c.tsdb.MaxExemplars = 0 for _, f := range c.featureList { opts := strings.Split(f, ",") for _, o := range opts { @@ -151,8 +148,8 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { c.enableExpandExternalLabels = true level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") case "exemplar-storage": - c.tsdb.MaxExemplars = maxExemplars - level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled", "maxExemplars", maxExemplars) + c.tsdb.EnableExemplarStorage = true + level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled") case "": continue default: @@ -283,9 +280,6 @@ func main() { a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default."). Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame) - a.Flag("storage.exemplars.exemplars-limit", "[EXPERIMENTAL] Maximum number of exemplars to store in in-memory exemplar storage total. 0 disables the exemplar storage. This flag is effective only with --enable-feature=exemplar-storage."). - Default("100000").IntVar(&cfg.tsdb.MaxExemplars) - a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). Default("1h").SetValue(&cfg.outageTolerance) @@ -352,10 +346,18 @@ func main() { } // Throw error for invalid config before starting other components. - if _, err := config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil { + var cfgFile *config.Config + if cfgFile, err = config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil { level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err) os.Exit(2) } + if cfg.tsdb.EnableExemplarStorage { + if cfgFile.StorageConfig.ExemplarsConfig == nil { + cfgFile.StorageConfig.ExemplarsConfig = &config.DefaultExemplarsConfig + } + cfg.tsdb.MaxExemplars = int64(cfgFile.StorageConfig.ExemplarsConfig.MaxExemplars) + } + // Now that the validity of the config is established, set the config // success metrics accordingly, although the config isn't really loaded // yet. This will happen later (including setting these metrics again), @@ -549,6 +551,9 @@ func main() { reloaders := []reloader{ { + name: "db_storage", + reloader: localStorage.ApplyConfig, + }, { name: "remote_storage", reloader: remoteStorage.ApplyConfig, }, { @@ -750,11 +755,11 @@ func main() { for { select { case <-hup: - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-webHandler.Reload(): - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { @@ -786,7 +791,7 @@ func main() { return nil } - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { return errors.Wrapf(err, "error loading config from %q", cfg.configFile) } @@ -975,7 +980,7 @@ type reloader struct { reloader func(*config.Config) error } -func reloadConfig(filename string, expandExternalLabels bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) { +func reloadConfig(filename string, expandExternalLabels bool, enableExemplarStorage bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) { start := time.Now() timings := []interface{}{} level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) @@ -994,6 +999,12 @@ func reloadConfig(filename string, expandExternalLabels bool, logger log.Logger, return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename) } + if enableExemplarStorage { + if conf.StorageConfig.ExemplarsConfig == nil { + conf.StorageConfig.ExemplarsConfig = &config.DefaultExemplarsConfig + } + } + failed := false for _, rl := range rls { rstart := time.Now() @@ -1099,6 +1110,11 @@ type readyStorage struct { stats *tsdb.DBStats } +func (s *readyStorage) ApplyConfig(conf *config.Config) error { + db := s.get() + return db.ApplyConfig(conf) +} + // Set the storage. func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.mtx.Lock() @@ -1274,7 +1290,8 @@ type tsdbOptions struct { StripeSize int MinBlockDuration model.Duration MaxBlockDuration model.Duration - MaxExemplars int + EnableExemplarStorage bool + MaxExemplars int64 } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1289,6 +1306,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { StripeSize: opts.StripeSize, MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + EnableExemplarStorage: opts.EnableExemplarStorage, MaxExemplars: opts.MaxExemplars, } } diff --git a/config/config.go b/config/config.go index af716d1f0d..b560faacef 100644 --- a/config/config.go +++ b/config/config.go @@ -183,6 +183,15 @@ var ( RemoteTimeout: model.Duration(1 * time.Minute), HTTPClientConfig: config.DefaultHTTPClientConfig, } + + // DefaultStorageConfig is the default TSDB/Exemplar storage configuration. + DefaultStorageConfig = StorageConfig{ + ExemplarsConfig: &DefaultExemplarsConfig, + } + + DefaultExemplarsConfig = ExemplarsConfig{ + MaxExemplars: 100000, + } ) // Config is the top-level configuration for Prometheus's config files. @@ -191,6 +200,7 @@ type Config struct { AlertingConfig AlertingConfig `yaml:"alerting,omitempty"` RuleFiles []string `yaml:"rule_files,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` + StorageConfig StorageConfig `yaml:"storage,omitempty"` RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` @@ -464,6 +474,18 @@ func (c *ScrapeConfig) MarshalYAML() (interface{}, error) { return discovery.MarshalYAMLWithInlineConfigs(c) } +// StorageConfig configures runtime reloadable configuration options. +type StorageConfig struct { + ExemplarsConfig *ExemplarsConfig `yaml:"exemplars,omitempty"` +} + +// ExemplarsConfig configures runtime reloadable configuration options. +type ExemplarsConfig struct { + // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. + // Use a value of 0 or less than 0 to disable the storage without having to restart Prometheus. + MaxExemplars int64 `yaml:"max_exemplars,omitempty"` +} + // AlertingConfig configures alerting and alertmanager related configs. type AlertingConfig struct { AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"` diff --git a/storage/interface.go b/storage/interface.go index e017d93173..92ad15b8c0 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -33,6 +33,7 @@ var ( ErrOutOfOrderExemplar = errors.New("out of order exemplar") ErrDuplicateExemplar = errors.New("duplicate exemplar") ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) + ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ) // Appendable allows creating appenders. diff --git a/tsdb/db.go b/tsdb/db.go index 6507c25f8e..b5a0c99593 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -147,9 +148,12 @@ type Options struct { // mainly meant for external users who import TSDB. BlocksToDelete BlocksToDeleteFunc + // Enables the in memory exemplar storage,. + EnableExemplarStorage bool + // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. - MaxExemplars int + MaxExemplars int64 } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -716,7 +720,8 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback - headOpts.NumExemplars = opts.MaxExemplars + headOpts.EnableExemplarStorage = opts.EnableExemplarStorage + headOpts.MaxExemplars.Store(opts.MaxExemplars) db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) if err != nil { return nil, err @@ -838,6 +843,10 @@ func (db *DB) Appender(ctx context.Context) storage.Appender { return dbAppender{db: db, Appender: db.head.Appender(ctx)} } +func (db *DB) ApplyConfig(conf *config.Config) error { + return db.head.ApplyConfig(conf) +} + // dbAppender wraps the DB's head appender and triggers compactions on commit // if necessary. type dbAppender struct { diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index d5b7be57f7..3aa9607bb4 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -20,21 +20,20 @@ import ( "unicode/utf8" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) -type CircularExemplarStorage struct { - exemplarsAppended prometheus.Counter - exemplarsInStorage prometheus.Gauge - seriesWithExemplarsInStorage prometheus.Gauge - lastExemplarsTs prometheus.Gauge - outOfOrderExemplars prometheus.Counter +// Indicates that there is no index entry for an exmplar. +const noExemplar = -1 +type CircularExemplarStorage struct { lock sync.RWMutex exemplars []*circularBufferEntry nextIndex int + metrics *ExemplarMetrics // Map of series labels as a string to index entry, which points to the first // and last exemplar for the series in the exemplars circular buffer. @@ -53,17 +52,17 @@ type circularBufferEntry struct { ref *indexEntry } -// NewCircularExemplarStorage creates an circular in memory exemplar storage. -// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in -// 1GB of extra memory, accounting for the fact that this is heap allocated space. -// If len < 1, then the exemplar storage is disabled. -func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarStorage, error) { - if len < 1 { - return &noopExemplarStorage{}, nil - } - c := &CircularExemplarStorage{ - exemplars: make([]*circularBufferEntry, len), - index: make(map[string]*indexEntry), +type ExemplarMetrics struct { + exemplarsAppended prometheus.Counter + exemplarsInStorage prometheus.Gauge + seriesWithExemplarsInStorage prometheus.Gauge + lastExemplarsTs prometheus.Gauge + maxExemplars prometheus.Gauge + outOfOrderExemplars prometheus.Counter +} + +func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics { + m := ExemplarMetrics{ exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_exemplar_exemplars_appended_total", Help: "Total number of appended exemplars.", @@ -86,20 +85,51 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", Help: "Total number of out of order exemplar ingestion failed attempts.", }), + maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_exemplar_max_exemplars", + Help: "Total number of exemplars the exemplar storage can store, resizeable.", + }), } + if reg != nil { reg.MustRegister( - c.exemplarsAppended, - c.exemplarsInStorage, - c.seriesWithExemplarsInStorage, - c.lastExemplarsTs, - c.outOfOrderExemplars, + m.exemplarsAppended, + m.exemplarsInStorage, + m.seriesWithExemplarsInStorage, + m.lastExemplarsTs, + m.outOfOrderExemplars, + m.maxExemplars, ) } + return &m +} + +// NewCircularExemplarStorage creates an circular in memory exemplar storage. +// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in +// 1GB of extra memory, accounting for the fact that this is heap allocated space. +// If len <= 0, then the exemplar storage is essentially a noop storage but can later be +// resized to store exemplars. +func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, error) { + if len < 0 { + len = 0 + } + c := &CircularExemplarStorage{ + exemplars: make([]*circularBufferEntry, len), + index: make(map[string]*indexEntry), + metrics: m, + } + + c.metrics.maxExemplars.Set(float64(len)) + return c, nil } +func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error { + ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) + return nil +} + func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage { return ce } @@ -116,6 +146,10 @@ func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { ret := make([]exemplar.QueryResult, 0) + if len(ce.exemplars) <= 0 { + return ret, nil + } + ce.lock.RLock() defer ce.lock.RUnlock() @@ -136,7 +170,7 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label if e.exemplar.Ts >= start { se.Exemplars = append(se.Exemplars, e.exemplar) } - if e.next == -1 { + if e.next == noExemplar { break } e = ce.exemplars[e.next] @@ -179,6 +213,10 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar. // Not thread safe. The append parameters tells us whether this is an external validation, or internal // as a result of an AddExemplar call, in which case we should update any relevant metrics. func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error { + if len(ce.exemplars) <= 0 { + return storage.ErrExemplarsDisabled + } + // Exemplar label length does not include chars involved in text rendering such as quotes // equals sign, or commas. See definition of const ExemplarMaxLabelLength. labelSetLen := 0 @@ -204,14 +242,92 @@ func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exempla if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts { if append { - ce.outOfOrderExemplars.Inc() + ce.metrics.outOfOrderExemplars.Inc() } return storage.ErrOutOfOrderExemplar } return nil } +// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. +// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed. +func (ce *CircularExemplarStorage) Resize(l int64) int { + // Accept negative values as just 0 size. + if l <= 0 { + l = 0 + } + + if l == int64(len(ce.exemplars)) { + return 0 + } + + ce.lock.Lock() + defer ce.lock.Unlock() + + oldBuffer := ce.exemplars + oldNextIndex := int64(ce.nextIndex) + + ce.exemplars = make([]*circularBufferEntry, l) + ce.index = make(map[string]*indexEntry) + ce.nextIndex = 0 + + // Replay as many entries as needed, starting with oldest first. + count := int64(len(oldBuffer)) + if l < count { + count = l + } + + migrated := 0 + + if l > 0 { + // Rewind previous next index by count with wrap-around. + // This math is essentially looking at nextIndex, where we would write the next exemplar to, + // and find the index in the old exemplar buffer that we should start migrating exemplars from. + // This way we don't migrate exemplars that would just be overwritten when migrating later exemplars. + var startIndex int64 = (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer)) + + for i := int64(0); i < count; i++ { + idx := (startIndex + i) % int64(len(oldBuffer)) + if entry := oldBuffer[idx]; entry != nil { + ce.migrate(entry) + migrated++ + } + } + } + + ce.computeMetrics() + ce.metrics.maxExemplars.Set(float64(l)) + + return migrated +} + +// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires +// external lock and does not compute metrics. +func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) { + seriesLabels := entry.ref.seriesLabels.String() + + idx, ok := ce.index[seriesLabels] + if !ok { + idx = entry.ref + idx.oldest = ce.nextIndex + ce.index[seriesLabels] = idx + } else { + entry.ref = idx + ce.exemplars[idx.newest].next = ce.nextIndex + } + idx.newest = ce.nextIndex + + entry.next = noExemplar + ce.exemplars[ce.nextIndex] = entry + + ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) +} + func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { + if len(ce.exemplars) <= 0 { + return storage.ErrExemplarsDisabled + } + seriesLabels := l.String() // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. @@ -241,7 +357,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp // There exists exemplar already on this ce.nextIndex entry, drop it, to make place // for others. prevLabels := prev.ref.seriesLabels.String() - if prev.next == -1 { + if prev.next == noExemplar { // Last item for this series, remove index entry. delete(ce.index, prevLabels) } else { @@ -251,43 +367,36 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) // since this is the first exemplar stored for this series. + ce.exemplars[ce.nextIndex].next = noExemplar ce.exemplars[ce.nextIndex].exemplar = e - ce.exemplars[ce.nextIndex].next = -1 ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels] ce.index[seriesLabels].newest = ce.nextIndex ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) - ce.exemplarsAppended.Inc() - ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) + ce.metrics.exemplarsAppended.Inc() + ce.computeMetrics() + return nil +} + +func (ce *CircularExemplarStorage) computeMetrics() { + ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) + + if len(ce.exemplars) == 0 { + ce.metrics.exemplarsInStorage.Set(float64(0)) + ce.metrics.lastExemplarsTs.Set(float64(0)) + return + } + if next := ce.exemplars[ce.nextIndex]; next != nil { - ce.exemplarsInStorage.Set(float64(len(ce.exemplars))) - ce.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000) - return nil + ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars))) + ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000) + return } // We did not yet fill the buffer. - ce.exemplarsInStorage.Set(float64(ce.nextIndex)) - ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) - return nil -} - -type noopExemplarStorage struct{} - -func (noopExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { - return nil -} - -func (noopExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { - return nil -} - -func (noopExemplarStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { - return &noopExemplarQuerier{}, nil -} - -type noopExemplarQuerier struct{} - -func (noopExemplarQuerier) Select(_, _ int64, _ ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { - return nil, nil + ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex)) + if ce.exemplars[0] != nil { + ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) + } } diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index 55e6fd3752..c81b3ffced 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -14,6 +14,9 @@ package tsdb import ( + "context" + "fmt" + "math" "reflect" "strconv" "strings" @@ -21,14 +24,17 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) +var eMetrics = NewExemplarMetrics(prometheus.DefaultRegisterer) + // Tests the same exemplar cases as AddExemplar, but specifically the ValidateExemplar function so it can be relied on externally. func TestValidateExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, nil) + exs, err := NewCircularExemplarStorage(2, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -87,7 +93,7 @@ func TestValidateExemplar(t *testing.T) { } func TestAddExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, nil) + exs, err := NewCircularExemplarStorage(2, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -150,7 +156,7 @@ func TestStorageOverflow(t *testing.T) { // Test that circular buffer index and assignment // works properly, adding more exemplars than can // be stored and then querying for them. - exs, err := NewCircularExemplarStorage(5, nil) + exs, err := NewCircularExemplarStorage(5, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -185,7 +191,7 @@ func TestStorageOverflow(t *testing.T) { } func TestSelectExemplar(t *testing.T) { - exs, err := NewCircularExemplarStorage(5, nil) + exs, err := NewCircularExemplarStorage(5, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -216,7 +222,7 @@ func TestSelectExemplar(t *testing.T) { } func TestSelectExemplar_MultiSeries(t *testing.T) { - exs, err := NewCircularExemplarStorage(5, nil) + exs, err := NewCircularExemplarStorage(5, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -273,8 +279,8 @@ func TestSelectExemplar_MultiSeries(t *testing.T) { } func TestSelectExemplar_TimeRange(t *testing.T) { - lenEs := 5 - exs, err := NewCircularExemplarStorage(lenEs, nil) + var lenEs int64 = 5 + exs, err := NewCircularExemplarStorage(lenEs, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -282,7 +288,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) { {Name: "service", Value: "asdf"}, } - for i := 0; i < lenEs; i++ { + for i := 0; int64(i) < lenEs; i++ { err := es.AddExemplar(l, exemplar.Exemplar{ Labels: labels.Labels{ labels.Label{ @@ -308,7 +314,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) { // Test to ensure that even though a series matches more than one matcher from the // query that it's exemplars are only included in the result a single time. func TestSelectExemplar_DuplicateSeries(t *testing.T) { - exs, err := NewCircularExemplarStorage(4, nil) + exs, err := NewCircularExemplarStorage(4, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -349,7 +355,7 @@ func TestSelectExemplar_DuplicateSeries(t *testing.T) { } func TestIndexOverwrite(t *testing.T) { - exs, err := NewCircularExemplarStorage(2, nil) + exs, err := NewCircularExemplarStorage(2, eMetrics) require.NoError(t, err) es := exs.(*CircularExemplarStorage) @@ -380,3 +386,162 @@ func TestIndexOverwrite(t *testing.T) { i := es.index[l2.String()] require.Equal(t, &indexEntry{0, 0, l2}, i) } + +func TestResize(t *testing.T) { + testCases := []struct { + name string + startSize int64 + newCount int64 + expectedSeries []int + notExpectedSeries []int + expectedMigrated int + }{ + { + name: "Grow", + startSize: 100, + newCount: 200, + expectedSeries: []int{99, 98, 1, 0}, + notExpectedSeries: []int{100}, + expectedMigrated: 100, + }, + { + name: "Shrink", + startSize: 100, + newCount: 50, + expectedSeries: []int{99, 98, 50}, + notExpectedSeries: []int{49, 1, 0}, + expectedMigrated: 50, + }, + { + name: "Zero", + startSize: 100, + newCount: 0, + expectedSeries: []int{}, + notExpectedSeries: []int{}, + expectedMigrated: 0, + }, + { + name: "Negative", + startSize: 100, + newCount: -1, + expectedSeries: []int{}, + notExpectedSeries: []int{}, + expectedMigrated: 0, + }, + { + name: "NegativeToNegative", + startSize: -1, + newCount: -2, + expectedSeries: []int{}, + notExpectedSeries: []int{}, + expectedMigrated: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + for i := 0; int64(i) < tc.startSize; i++ { + err = es.AddExemplar(labels.FromStrings("service", strconv.Itoa(i)), exemplar.Exemplar{ + Value: float64(i), + Ts: int64(i)}) + require.NoError(t, err) + } + + resized := es.Resize(tc.newCount) + require.Equal(t, tc.expectedMigrated, resized) + + q, err := es.Querier(context.TODO()) + require.NoError(t, err) + + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "service", "")} + + for _, expected := range tc.expectedSeries { + matchers[0].Value = strconv.Itoa(expected) + ex, err := q.Select(0, math.MaxInt64, matchers) + require.NoError(t, err) + require.NotEmpty(t, ex) + } + + for _, notExpected := range tc.notExpectedSeries { + matchers[0].Value = strconv.Itoa(notExpected) + ex, err := q.Select(0, math.MaxInt64, matchers) + require.NoError(t, err) + require.Empty(t, ex) + } + }) + } +} + +func BenchmarkAddExemplar(t *testing.B) { + exs, err := NewCircularExemplarStorage(int64(t.N), eMetrics) + require.NoError(t, err) + es := exs.(*CircularExemplarStorage) + + for i := 0; i < t.N; i++ { + l := labels.FromStrings("service", strconv.Itoa(i)) + + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) + require.NoError(t, err) + } +} + +func BenchmarkResizeExemplars(b *testing.B) { + testCases := []struct { + name string + startSize int64 + endSize int64 + numExemplars int + }{ + { + name: "grow", + startSize: 100000, + endSize: 200000, + numExemplars: 150000, + }, + { + name: "shrink", + startSize: 100000, + endSize: 50000, + numExemplars: 100000, + }, + { + name: "grow", + startSize: 1000000, + endSize: 2000000, + numExemplars: 1500000, + }, + { + name: "shrink", + startSize: 1000000, + endSize: 500000, + numExemplars: 1000000, + }, + } + + for _, tc := range testCases { + exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) + require.NoError(b, err) + es := exs.(*CircularExemplarStorage) + + for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ { + l := labels.FromStrings("service", strconv.Itoa(i)) + + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) + require.NoError(b, err) + } + saveIndex := es.index + saveExemplars := es.exemplars + + b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(t *testing.B) { + es.index = saveIndex + es.exemplars = saveExemplars + b.ResetTimer() + es.Resize(tc.endSize) + }) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index e3fb8a3f59..0b35742331 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -70,16 +71,17 @@ type Head struct { lastWALTruncationTime atomic.Int64 lastSeriesID atomic.Uint64 - metrics *headMetrics - opts *HeadOptions - wal *wal.WAL - exemplars ExemplarStorage - logger log.Logger - appendPool sync.Pool - exemplarsPool sync.Pool - seriesPool sync.Pool - bytesPool sync.Pool - memChunkPool sync.Pool + metrics *headMetrics + opts *HeadOptions + wal *wal.WAL + exemplarMetrics *ExemplarMetrics + exemplars ExemplarStorage + logger log.Logger + appendPool sync.Pool + exemplarsPool sync.Pool + seriesPool sync.Pool + bytesPool sync.Pool + memChunkPool sync.Pool // All series addressable by their ID or hash. series *stripeSeries @@ -107,6 +109,7 @@ type Head struct { closed bool stats *HeadStats + reg prometheus.Registerer } // HeadOptions are parameters for the Head block. @@ -119,9 +122,12 @@ type HeadOptions struct { // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. - StripeSize int - SeriesCallback SeriesLifecycleCallback - NumExemplars int + StripeSize int + SeriesCallback SeriesLifecycleCallback + EnableExemplarStorage bool + + // Runtime reloadable options. + MaxExemplars atomic.Int64 } func DefaultHeadOptions() *HeadOptions { @@ -363,6 +369,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings // NewHead opens the head block in dir. func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { + var err error if l == nil { l = log.NewNopLogger() } @@ -373,7 +380,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.SeriesCallback = &noopSeriesLifecycleCallback{} } - es, err := NewCircularExemplarStorage(opts.NumExemplars, r) + em := NewExemplarMetrics(r) + es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em) if err != nil { return nil, err } @@ -383,22 +391,24 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti } h := &Head{ - wal: wal, - logger: l, - opts: opts, - exemplars: es, - series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), - symbols: map[string]struct{}{}, - postings: index.NewUnorderedMemPostings(), - tombstones: tombstones.NewMemTombstones(), - iso: newIsolation(), - deleted: map[uint64]int{}, + wal: wal, + logger: l, + opts: opts, + exemplarMetrics: em, + exemplars: es, + series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), + symbols: map[string]struct{}{}, + postings: index.NewUnorderedMemPostings(), + tombstones: tombstones.NewMemTombstones(), + iso: newIsolation(), + deleted: map[uint64]int{}, memChunkPool: sync.Pool{ New: func() interface{} { return &memChunk{} }, }, stats: stats, + reg: r, } h.chunkRange.Store(opts.ChunkRange) h.minTime.Store(math.MaxInt64) @@ -424,6 +434,26 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } +func (h *Head) ApplyConfig(cfg *config.Config) error { + if !h.opts.EnableExemplarStorage { + return nil + } + + // Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage + // to decide if it should pass exemplars along to it's exemplar storage, so we + // need to update opts.MaxExemplars here. + prevSize := h.opts.MaxExemplars.Load() + h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) + + if prevSize == h.opts.MaxExemplars.Load() { + return nil + } + + migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load()) + level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated) + return nil +} + func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { return h.exemplars.ExemplarQuerier(ctx) } @@ -1182,7 +1212,7 @@ func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64 func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { // Check if exemplar storage is enabled. - if a.head.opts.NumExemplars <= 0 { + if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { return 0, nil } @@ -1239,7 +1269,7 @@ func (h *Head) appender() *headAppender { // Allocate the exemplars buffer only if exemplars are enabled. var exemplarsBuf []exemplarWithSeriesRef - if h.opts.NumExemplars > 0 { + if h.opts.EnableExemplarStorage { exemplarsBuf = h.getExemplarBuffer() } @@ -1253,7 +1283,6 @@ func (h *Head) appender() *headAppender { exemplars: exemplarsBuf, appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, - exemplarAppender: h.exemplars, } } @@ -1270,19 +1299,6 @@ func max(a, b int64) int64 { return b } -func (h *Head) ExemplarAppender() storage.ExemplarAppender { - h.metrics.activeAppenders.Inc() - - // The head cache might not have a starting point yet. The init appender - // picks up the first appended timestamp as the base. - if h.MinTime() == math.MaxInt64 { - return &initAppender{ - head: h, - } - } - return h.appender() -} - func (h *Head) getAppendBuffer() []record.RefSample { b := h.appendPool.Get() if b == nil { @@ -1345,10 +1361,9 @@ type exemplarWithSeriesRef struct { } type headAppender struct { - head *Head - minValidTime int64 // No samples below this timestamp are allowed. - mint, maxt int64 - exemplarAppender ExemplarStorage + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 series []record.RefSeries samples []record.RefSample @@ -1422,10 +1437,9 @@ func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64 // use getOrCreate or make any of the lset sanity checks that Append does. func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { // Check if exemplar storage is enabled. - if a.head.opts.NumExemplars <= 0 { + if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { return 0, nil } - s := a.head.series.getByID(ref) if s == nil { return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) @@ -1434,9 +1448,9 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex // Ensure no empty labels have gotten through. e.Labels = e.Labels.WithoutEmpty() - err := a.exemplarAppender.ValidateExemplar(s.lset, e) + err := a.head.exemplars.ValidateExemplar(s.lset, e) if err != nil { - if err == storage.ErrDuplicateExemplar { + if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled { // Duplicate, don't return an error but don't accept the exemplar. return 0, nil } @@ -1525,7 +1539,7 @@ func (a *headAppender) Commit() (err error) { for _, e := range a.exemplars { s := a.head.series.getByID(e.ref) // We don't instrument exemplar appends here, all is instrumented by storage. - if err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar); err != nil { + if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { if err == storage.ErrOutOfOrderExemplar { continue } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 371e0ecd62..df71b876f6 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -30,6 +30,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -51,7 +52,8 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts := DefaultHeadOptions() opts.ChunkRange = chunkRange opts.ChunkDirRoot = dir - opts.NumExemplars = 10 + opts.EnableExemplarStorage = true + opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 75c179c522..53c4ece132 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -18,6 +18,7 @@ import ( "os" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -38,12 +39,14 @@ func New(t testutil.T) *TestStorage { opts := tsdb.DefaultOptions() opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) - opts.MaxExemplars = 10 db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) if err != nil { t.Fatalf("Opening test storage failed: %s", err) } - es, err := tsdb.NewCircularExemplarStorage(10, nil) + reg := prometheus.NewRegistry() + eMetrics := tsdb.NewExemplarMetrics(reg) + + es, err := tsdb.NewCircularExemplarStorage(10, eMetrics) if err != nil { t.Fatalf("Opening test exemplar storage failed: %s", err) }