Initial draft of prometheus-agent (#8785)

* Initial draft of prometheus-agent

This commit introduces a new binary, prometheus-agent, based on the
Grafana Agent code. It runs a WAL-only version of prometheus without the
TSDB, alerting, or rule evaluations. It is intended to be used to
remote_write to Prometheus or another remote_write receiver.

By default, prometheus-agent will listen on port 9095 to not collide
with the prometheus default of 9090.

Truncation of the WAL cooperates on a best-effort case with Remote
Write. Every time the WAL is truncated, the minimum timestamp of data to
truncate is determined by the lowest sent timestamp of all samples
across all remote_write endpoints. This gives loose guarantees that data
from the WAL will not try to be removed until the maximum sample
lifetime passes or remote_write starts functionining.

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* add tests for Prometheus agent (#22)

* add tests for Prometheus agent

* add tests for Prometheus agent

* rearranged tests as per the review comments

* update tests for Agent

* changes as per code review comments

Signed-off-by: SriKrishna Paparaju <paparaju@gmail.com>

* incremental changes to prometheus agent

Signed-off-by: SriKrishna Paparaju <paparaju@gmail.com>

* changes as per code review comments

Signed-off-by: SriKrishna Paparaju <paparaju@gmail.com>

* Commit feedback from code review

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* Port over some comments from grafana/agent

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* Rename agent.Storage to agent.DB for tsdb consistency

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* Consolidate agentMode ifs in cmd/prometheus/main.go

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* Document PreAction usage requirements better for agent mode flags

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* remove unnecessary defaultListenAddr

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* `go fmt ./tsdb/agent` and fix lint errors

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

Co-authored-by: SriKrishna Paparaju <paparaju@gmail.com>
This commit is contained in:
Robert Fratto 2021-10-29 11:25:05 -04:00 committed by GitHub
parent c91c2bbea5
commit bc72a718c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1802 additions and 49 deletions

2
.gitignore vendored
View file

@ -8,7 +8,9 @@
/promtool /promtool
benchmark.txt benchmark.txt
/data /data
/data-agent
/cmd/prometheus/data /cmd/prometheus/data
/cmd/prometheus/data-agent
/cmd/prometheus/debug /cmd/prometheus/debug
/benchout /benchout
/cmd/promtool/data /cmd/promtool/data

View file

@ -72,11 +72,14 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
) )
var ( var (
appName = "prometheus"
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_config_last_reload_successful", Name: "prometheus_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.", Help: "Whether the last configuration reload attempt was successful.",
@ -88,10 +91,13 @@ var (
defaultRetentionString = "15d" defaultRetentionString = "15d"
defaultRetentionDuration model.Duration defaultRetentionDuration model.Duration
agentMode bool
agentOnlyFlags, serverOnlyFlags []string
) )
func init() { func init() {
prometheus.MustRegister(version.NewCollector("prometheus")) prometheus.MustRegister(version.NewCollector(strings.ReplaceAll(appName, "-", "_")))
var err error var err error
defaultRetentionDuration, err = model.ParseDuration(defaultRetentionString) defaultRetentionDuration, err = model.ParseDuration(defaultRetentionString)
@ -100,6 +106,37 @@ func init() {
} }
} }
// agentOnlySetting can be provided to a kingpin flag's PreAction to mark a
// flag as agent-only.
func agentOnlySetting() func(*kingpin.ParseContext) error {
return func(pc *kingpin.ParseContext) error {
agentOnlyFlags = append(agentOnlyFlags, extractFlagName(pc))
return nil
}
}
// serverOnlySetting can be provided to a kingpin flag's PreAction to mark a
// flag as server-only.
func serverOnlySetting() func(*kingpin.ParseContext) error {
return func(pc *kingpin.ParseContext) error {
serverOnlyFlags = append(serverOnlyFlags, extractFlagName(pc))
return nil
}
}
// extractFlagName gets the flag name from the ParseContext. Only call
// from agentOnlySetting or serverOnlySetting.
func extractFlagName(pc *kingpin.ParseContext) string {
for _, pe := range pc.Elements {
fc, ok := pe.Clause.(*kingpin.FlagClause)
if !ok {
continue
}
return fc.Model().Name
}
panic("extractFlagName not called from a kingpin PreAction. This is a bug, please report to Prometheus.")
}
type flagConfig struct { type flagConfig struct {
configFile string configFile string
@ -111,6 +148,7 @@ type flagConfig struct {
web web.Options web web.Options
scrape scrape.Options scrape scrape.Options
tsdb tsdbOptions tsdb tsdbOptions
agent agentOptions
lookbackDelta model.Duration lookbackDelta model.Duration
webTimeout model.Duration webTimeout model.Duration
queryTimeout model.Duration queryTimeout model.Duration
@ -196,10 +234,12 @@ func main() {
a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout) a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout)
a.Version(version.Print("prometheus")) a.Version(version.Print(appName))
a.HelpFlag.Short('h') a.HelpFlag.Short('h')
a.Flag("agent", "Agent mode.").BoolVar(&agentMode)
a.Flag("config.file", "Prometheus configuration file path."). a.Flag("config.file", "Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile) Default("prometheus.yml").StringVar(&cfg.configFile)
@ -245,60 +285,105 @@ func main() {
Default(".*").StringVar(&cfg.corsRegexString) Default(".*").StringVar(&cfg.corsRegexString)
a.Flag("storage.tsdb.path", "Base path for metrics storage."). a.Flag("storage.tsdb.path", "Base path for metrics storage.").
PreAction(serverOnlySetting()).
Default("data/").StringVar(&cfg.localStoragePath) Default("data/").StringVar(&cfg.localStoragePath)
a.Flag("storage.tsdb.min-block-duration", "Minimum duration of a data block before being persisted. For use in testing."). a.Flag("storage.tsdb.min-block-duration", "Minimum duration of a data block before being persisted. For use in testing.").
PreAction(serverOnlySetting()).
Hidden().Default("2h").SetValue(&cfg.tsdb.MinBlockDuration) Hidden().Default("2h").SetValue(&cfg.tsdb.MinBlockDuration)
a.Flag("storage.tsdb.max-block-duration", a.Flag("storage.tsdb.max-block-duration",
"Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)"). "Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)").
PreAction(serverOnlySetting()).
Hidden().PlaceHolder("<duration>").SetValue(&cfg.tsdb.MaxBlockDuration) Hidden().PlaceHolder("<duration>").SetValue(&cfg.tsdb.MaxBlockDuration)
a.Flag("storage.tsdb.max-block-chunk-segment-size", a.Flag("storage.tsdb.max-block-chunk-segment-size",
"The maximum size for a single chunk segment in a block. Example: 512MB"). "The maximum size for a single chunk segment in a block. Example: 512MB").
PreAction(serverOnlySetting()).
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize) Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize)
a.Flag("storage.tsdb.wal-segment-size", a.Flag("storage.tsdb.wal-segment-size",
"Size at which to split the tsdb WAL segment files. Example: 100MB"). "Size at which to split the tsdb WAL segment files. Example: 100MB").
PreAction(serverOnlySetting()).
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.WALSegmentSize) Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.WALSegmentSize)
a.Flag("storage.tsdb.retention", "[DEPRECATED] How long to retain samples in storage. This flag has been deprecated, use \"storage.tsdb.retention.time\" instead."). a.Flag("storage.tsdb.retention", "[DEPRECATED] How long to retain samples in storage. This flag has been deprecated, use \"storage.tsdb.retention.time\" instead.").
PreAction(serverOnlySetting()).
SetValue(&oldFlagRetentionDuration) SetValue(&oldFlagRetentionDuration)
a.Flag("storage.tsdb.retention.time", "How long to retain samples in storage. When this flag is set it overrides \"storage.tsdb.retention\". If neither this flag nor \"storage.tsdb.retention\" nor \"storage.tsdb.retention.size\" is set, the retention time defaults to "+defaultRetentionString+". Units Supported: y, w, d, h, m, s, ms."). a.Flag("storage.tsdb.retention.time", "How long to retain samples in storage. When this flag is set it overrides \"storage.tsdb.retention\". If neither this flag nor \"storage.tsdb.retention\" nor \"storage.tsdb.retention.size\" is set, the retention time defaults to "+defaultRetentionString+". Units Supported: y, w, d, h, m, s, ms.").
PreAction(serverOnlySetting()).
SetValue(&newFlagRetentionDuration) SetValue(&newFlagRetentionDuration)
a.Flag("storage.tsdb.retention.size", "Maximum number of bytes that can be stored for blocks. A unit is required, supported units: B, KB, MB, GB, TB, PB, EB. Ex: \"512MB\"."). a.Flag("storage.tsdb.retention.size", "Maximum number of bytes that can be stored for blocks. A unit is required, supported units: B, KB, MB, GB, TB, PB, EB. Ex: \"512MB\".").
PreAction(serverOnlySetting()).
BytesVar(&cfg.tsdb.MaxBytes) BytesVar(&cfg.tsdb.MaxBytes)
a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory.").
PreAction(serverOnlySetting()).
Default("false").BoolVar(&cfg.tsdb.NoLockfile) Default("false").BoolVar(&cfg.tsdb.NoLockfile)
a.Flag("storage.tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge."). a.Flag("storage.tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").
PreAction(serverOnlySetting()).
Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks) Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks)
a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL."). a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL.").
PreAction(serverOnlySetting()).
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
a.Flag("storage.agent.path", "Base path for metrics storage.").
PreAction(agentOnlySetting()).
Default("data-agent/").StringVar(&cfg.localStoragePath)
a.Flag("storage.agent.segment-size",
"Size at which to split WAL segment files. Example: 100MB").
PreAction(agentOnlySetting()).
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.agent.WALSegmentSize)
a.Flag("storage.agent.compression", "Compress the agent WAL.").
PreAction(agentOnlySetting()).
Default("true").BoolVar(&cfg.agent.WALCompression)
a.Flag("storage.agent.truncate-frequency",
"The frequency at which to truncate the WAL and remove old data.").
PreAction(agentOnlySetting()).
Hidden().PlaceHolder("<duration>").SetValue(&cfg.agent.TruncateFrequency)
a.Flag("storage.agent.retention.min-time",
"Minimum age samples may be before being considered for deletion when the WAL is truncated").
PreAction(agentOnlySetting()).
SetValue(&cfg.agent.MinWALTime)
a.Flag("storage.agent.retention.max-time",
"Maximum age samples may be before being forcibly deleted when the WAL is truncated").
PreAction(agentOnlySetting()).
SetValue(&cfg.agent.MaxWALTime)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline) Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types."). a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types.").
PreAction(serverOnlySetting()).
Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit)
a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
PreAction(serverOnlySetting()).
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)
a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default."). a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default.").
PreAction(serverOnlySetting()).
Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame) Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame)
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").
PreAction(serverOnlySetting()).
Default("1h").SetValue(&cfg.outageTolerance) Default("1h").SetValue(&cfg.outageTolerance)
a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period."). a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period.").
PreAction(serverOnlySetting()).
Default("10m").SetValue(&cfg.forGracePeriod) Default("10m").SetValue(&cfg.forGracePeriod)
a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
PreAction(serverOnlySetting()).
Default("1m").SetValue(&cfg.resendDelay) Default("1m").SetValue(&cfg.resendDelay)
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
@ -308,21 +393,26 @@ func main() {
Hidden().Default("2ms").DurationVar(&scrape.ScrapeTimestampTolerance) Hidden().Default("2ms").DurationVar(&scrape.ScrapeTimestampTolerance)
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
PreAction(serverOnlySetting()).
Default("10000").IntVar(&cfg.notifier.QueueCapacity) Default("10000").IntVar(&cfg.notifier.QueueCapacity)
// TODO: Remove in Prometheus 3.0. // TODO: Remove in Prometheus 3.0.
alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String() alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String()
a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation."). a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation.").
PreAction(serverOnlySetting()).
Default("5m").SetValue(&cfg.lookbackDelta) Default("5m").SetValue(&cfg.lookbackDelta)
a.Flag("query.timeout", "Maximum time a query may take before being aborted."). a.Flag("query.timeout", "Maximum time a query may take before being aborted.").
PreAction(serverOnlySetting()).
Default("2m").SetValue(&cfg.queryTimeout) Default("2m").SetValue(&cfg.queryTimeout)
a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
PreAction(serverOnlySetting()).
Default("20").IntVar(&cfg.queryConcurrency) Default("20").IntVar(&cfg.queryConcurrency)
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
PreAction(serverOnlySetting()).
Default("50000000").IntVar(&cfg.queryMaxSamples) Default("50000000").IntVar(&cfg.queryMaxSamples)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
@ -390,7 +480,8 @@ func main() {
// RoutePrefix must always be at least '/'. // RoutePrefix must always be at least '/'.
cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/") cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/")
{ // Time retention settings. if !agentMode {
// Time retention settings.
if oldFlagRetentionDuration != 0 { if oldFlagRetentionDuration != 0 {
level.Warn(logger).Log("deprecation_notice", "'storage.tsdb.retention' flag is deprecated use 'storage.tsdb.retention.time' instead.") level.Warn(logger).Log("deprecation_notice", "'storage.tsdb.retention' flag is deprecated use 'storage.tsdb.retention.time' instead.")
cfg.tsdb.RetentionDuration = oldFlagRetentionDuration cfg.tsdb.RetentionDuration = oldFlagRetentionDuration
@ -415,9 +506,8 @@ func main() {
cfg.tsdb.RetentionDuration = y cfg.tsdb.RetentionDuration = y
level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String()) level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String())
} }
}
{ // Max block size settings. // Max block size settings.
if cfg.tsdb.MaxBlockDuration == 0 { if cfg.tsdb.MaxBlockDuration == 0 {
maxBlockDuration, err := model.ParseDuration("31d") maxBlockDuration, err := model.ParseDuration("31d")
if err != nil { if err != nil {
@ -483,7 +573,12 @@ func main() {
var ( var (
scrapeManager = scrape.NewManager(&cfg.scrape, log.With(logger, "component", "scrape manager"), fanoutStorage) scrapeManager = scrape.NewManager(&cfg.scrape, log.With(logger, "component", "scrape manager"), fanoutStorage)
opts = promql.EngineOpts{ queryEngine *promql.Engine
ruleManager *rules.Manager
)
if !agentMode {
opts := promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"), Logger: log.With(logger, "component", "query engine"),
Reg: prometheus.DefaultRegisterer, Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples, MaxSamples: cfg.queryMaxSamples,
@ -510,7 +605,7 @@ func main() {
ForGracePeriod: time.Duration(cfg.forGracePeriod), ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay), ResendDelay: time.Duration(cfg.resendDelay),
}) })
) }
scraper.Set(scrapeManager) scraper.Set(scrapeManager)
@ -526,6 +621,7 @@ func main() {
cfg.web.RuleManager = ruleManager cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifierManager cfg.web.Notifier = notifierManager
cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta) cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta)
cfg.web.IsAgent = agentMode
cfg.web.Version = &web.PrometheusVersion{ cfg.web.Version = &web.PrometheusVersion{
Version: version.Version, Version: version.Version,
@ -572,6 +668,11 @@ func main() {
}, { }, {
name: "query_engine", name: "query_engine",
reloader: func(cfg *config.Config) error { reloader: func(cfg *config.Config) error {
if agentMode {
// No-op in Agent mode.
return nil
}
if cfg.GlobalConfig.QueryLogFile == "" { if cfg.GlobalConfig.QueryLogFile == "" {
queryEngine.SetQueryLogger(nil) queryEngine.SetQueryLogger(nil)
return nil return nil
@ -613,6 +714,11 @@ func main() {
}, { }, {
name: "rules", name: "rules",
reloader: func(cfg *config.Config) error { reloader: func(cfg *config.Config) error {
if agentMode {
// No-op in Agent mode
return nil
}
// Get all rule files matching the configuration paths. // Get all rule files matching the configuration paths.
var files []string var files []string
for _, pat := range cfg.RuleFiles { for _, pat := range cfg.RuleFiles {
@ -817,7 +923,7 @@ func main() {
}, },
) )
} }
{ if !agentMode {
// Rule manager. // Rule manager.
g.Add( g.Add(
func() error { func() error {
@ -829,8 +935,7 @@ func main() {
ruleManager.Stop() ruleManager.Stop()
}, },
) )
}
{
// TSDB. // TSDB.
opts := cfg.tsdb.ToTSDBOptions() opts := cfg.tsdb.ToTSDBOptions()
cancel := make(chan struct{}) cancel := make(chan struct{})
@ -892,6 +997,59 @@ func main() {
}, },
) )
} }
if agentMode {
// WAL storage.
opts := cfg.agent.ToAgentOptions()
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "Starting WAL storage ...")
if cfg.agent.WALSegmentSize != 0 {
if cfg.agent.WALSegmentSize < 10*1024*1024 || cfg.agent.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.agent.segment-size' must be set between 10MB and 256MB")
}
}
db, err := agent.Open(
logger,
prometheus.DefaultRegisterer,
remoteStorage,
cfg.localStoragePath,
&opts,
)
if err != nil {
return errors.Wrap(err, "opening storage failed")
}
switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}
level.Info(logger).Log("msg", "Agent WAL storage started")
level.Debug(logger).Log("msg", "Agent WAL storage options",
"WALSegmentSize", cfg.agent.WALSegmentSize,
"WALCompression", cfg.agent.WALCompression,
"StripeSize", cfg.agent.StripeSize,
"TruncateFrequency", cfg.agent.TruncateFrequency,
"MinWALTime", cfg.agent.MinWALTime,
"MaxWALTime", cfg.agent.MaxWALTime,
)
localStorage.Set(db, 0)
close(dbOpen)
<-cancel
return nil
},
func(e error) {
if err := fanoutStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
}
close(cancel)
},
)
}
{ {
// Web handler. // Web handler.
g.Add( g.Add(
@ -1015,6 +1173,25 @@ func reloadConfig(filename string, expandExternalLabels bool, enableExemplarStor
} }
} }
// Perform validation for Agent-compatible configs and remove anything that's unsupported.
if agentMode {
// Perform validation for Agent-compatible configs and remove anything that's
// unsupported.
if len(conf.AlertingConfig.AlertRelabelConfigs) > 0 || len(conf.AlertingConfig.AlertmanagerConfigs) > 0 {
level.Warn(logger).Log("msg", "alerting configs not supported in agent mode")
conf.AlertingConfig.AlertRelabelConfigs = []*relabel.Config{}
conf.AlertingConfig.AlertmanagerConfigs = config.AlertmanagerConfigs{}
}
if len(conf.RuleFiles) > 0 {
level.Warn(logger).Log("msg", "recording rules not supported in agent mode")
conf.RuleFiles = []string{}
}
if len(conf.RemoteReadConfigs) > 0 {
level.Warn(logger).Log("msg", "remote_read configs not supported in agent mode")
conf.RemoteReadConfigs = []*config.RemoteReadConfig{}
}
}
failed := false failed := false
for _, rl := range rls { for _, rl := range rls {
rstart := time.Now() rstart := time.Now()
@ -1115,18 +1292,21 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
// storage at a later point in time. // storage at a later point in time.
type readyStorage struct { type readyStorage struct {
mtx sync.RWMutex mtx sync.RWMutex
db *tsdb.DB db storage.Storage
startTimeMargin int64 startTimeMargin int64
stats *tsdb.DBStats stats *tsdb.DBStats
} }
func (s *readyStorage) ApplyConfig(conf *config.Config) error { func (s *readyStorage) ApplyConfig(conf *config.Config) error {
db := s.get() db := s.get()
if db, ok := db.(*tsdb.DB); ok {
return db.ApplyConfig(conf) return db.ApplyConfig(conf)
} }
return nil
}
// Set the storage. // Set the storage.
func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { func (s *readyStorage) Set(db storage.Storage, startTimeMargin int64) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
@ -1134,7 +1314,7 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.startTimeMargin = startTimeMargin s.startTimeMargin = startTimeMargin
} }
func (s *readyStorage) get() *tsdb.DB { func (s *readyStorage) get() storage.Storage {
s.mtx.RLock() s.mtx.RLock()
x := s.db x := s.db
s.mtx.RUnlock() s.mtx.RUnlock()
@ -1151,15 +1331,21 @@ func (s *readyStorage) getStats() *tsdb.DBStats {
// StartTime implements the Storage interface. // StartTime implements the Storage interface.
func (s *readyStorage) StartTime() (int64, error) { func (s *readyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil { if x := s.get(); x != nil {
switch db := x.(type) {
case *tsdb.DB:
var startTime int64 var startTime int64
if len(db.Blocks()) > 0 {
if len(x.Blocks()) > 0 { startTime = db.Blocks()[0].Meta().MinTime
startTime = x.Blocks()[0].Meta().MinTime
} else { } else {
startTime = time.Now().Unix() * 1000 startTime = time.Now().Unix() * 1000
} }
// Add a safety margin as it may take a few minutes for everything to spin up. // Add a safety margin as it may take a few minutes for everything to spin up.
return startTime + s.startTimeMargin, nil return startTime + s.startTimeMargin, nil
case *agent.DB:
return db.StartTime()
default:
panic(fmt.Sprintf("unkown storage type %T", db))
}
} }
return math.MaxInt64, tsdb.ErrNotReady return math.MaxInt64, tsdb.ErrNotReady
@ -1183,7 +1369,14 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor
func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
if x := s.get(); x != nil { if x := s.get(); x != nil {
return x.ExemplarQuerier(ctx) switch db := x.(type) {
case *tsdb.DB:
return db.ExemplarQuerier(ctx)
case *agent.DB:
return nil, agent.ErrUnsupported
default:
panic(fmt.Sprintf("unknown storage type %T", db))
}
} }
return nil, tsdb.ErrNotReady return nil, tsdb.ErrNotReady
} }
@ -1221,7 +1414,14 @@ func (s *readyStorage) Close() error {
// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. // CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) CleanTombstones() error { func (s *readyStorage) CleanTombstones() error {
if x := s.get(); x != nil { if x := s.get(); x != nil {
return x.CleanTombstones() switch db := x.(type) {
case *tsdb.DB:
return db.CleanTombstones()
case *agent.DB:
return agent.ErrUnsupported
default:
panic(fmt.Sprintf("unknown storage type %T", db))
}
} }
return tsdb.ErrNotReady return tsdb.ErrNotReady
} }
@ -1229,7 +1429,14 @@ func (s *readyStorage) CleanTombstones() error {
// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. // Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
if x := s.get(); x != nil { if x := s.get(); x != nil {
return x.Delete(mint, maxt, ms...) switch db := x.(type) {
case *tsdb.DB:
return db.Delete(mint, maxt, ms...)
case *agent.DB:
return agent.ErrUnsupported
default:
panic(fmt.Sprintf("unknown storage type %T", db))
}
} }
return tsdb.ErrNotReady return tsdb.ErrNotReady
} }
@ -1237,7 +1444,14 @@ func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. // Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Snapshot(dir string, withHead bool) error { func (s *readyStorage) Snapshot(dir string, withHead bool) error {
if x := s.get(); x != nil { if x := s.get(); x != nil {
return x.Snapshot(dir, withHead) switch db := x.(type) {
case *tsdb.DB:
return db.Snapshot(dir, withHead)
case *agent.DB:
return agent.ErrUnsupported
default:
panic(fmt.Sprintf("unknown storage type %T", db))
}
} }
return tsdb.ErrNotReady return tsdb.ErrNotReady
} }
@ -1245,7 +1459,14 @@ func (s *readyStorage) Snapshot(dir string, withHead bool) error {
// Stats implements the api_v1.TSDBAdminStats interface. // Stats implements the api_v1.TSDBAdminStats interface.
func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) { func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
if x := s.get(); x != nil { if x := s.get(); x != nil {
return x.Head().Stats(statsByLabelName), nil switch db := x.(type) {
case *tsdb.DB:
return db.Head().Stats(statsByLabelName), nil
case *agent.DB:
return nil, agent.ErrUnsupported
default:
panic(fmt.Sprintf("unknown storage type %T", db))
}
} }
return nil, tsdb.ErrNotReady return nil, tsdb.ErrNotReady
} }
@ -1323,6 +1544,27 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
} }
} }
// agentOptions is a version of agent.Options with defined units. This is required
// as agent.Option fields are unit agnostic (time).
type agentOptions struct {
WALSegmentSize units.Base2Bytes
WALCompression bool
StripeSize int
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
}
func (opts agentOptions) ToAgentOptions() agent.Options {
return agent.Options{
WALSegmentSize: int(opts.WALSegmentSize),
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
TruncateFrequency: time.Duration(opts.TruncateFrequency),
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
}
}
func initTracing(logger log.Logger) (io.Closer, error) { func initTracing(logger log.Logger) (io.Closer, error) {
// Set tracing configuration defaults. // Set tracing configuration defaults.
cfg := &jcfg.Configuration{ cfg := &jcfg.Configuration{

View file

@ -347,3 +347,23 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames
} }
return res return res
} }
func TestAgentSuccessfulStartup(t *testing.T) {
prom := exec.Command(promPath, "-test.main", "--agent", "--config.file="+promConfig)
err := prom.Start()
require.NoError(t, err)
expectedExitStatus := 0
actualExitStatus := 0
done := make(chan error, 1)
go func() { done <- prom.Wait() }()
select {
case err := <-done:
t.Logf("prometheus agent should be still running: %v", err)
actualExitStatus = prom.ProcessState.ExitCode()
case <-time.After(5 * time.Second):
prom.Process.Kill()
}
require.Equal(t, expectedExitStatus, actualExitStatus)
}

View file

@ -179,6 +179,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender {
return s.rws.Appender(ctx) return s.rws.Appender(ctx)
} }
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
func (s *Storage) LowestSentTimestamp() int64 {
return s.rws.LowestSentTimestamp()
}
// Close the background processing of the storage queues. // Close the background processing of the storage queues.
func (s *Storage) Close() error { func (s *Storage) Close() error {
s.mtx.Lock() s.mtx.Lock()

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"sync" "sync"
"time" "time"
@ -207,6 +208,26 @@ func (rws *WriteStorage) Appender(_ context.Context) storage.Appender {
} }
} }
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
func (rws *WriteStorage) LowestSentTimestamp() int64 {
rws.mtx.Lock()
defer rws.mtx.Unlock()
var lowestTs int64 = math.MaxInt64
for _, q := range rws.queues {
ts := int64(q.metrics.highestSentTimestamp.Get() * 1000)
if ts < lowestTs {
lowestTs = ts
}
}
if len(rws.queues) == 0 {
lowestTs = 0
}
return lowestTs
}
// Close closes the WriteStorage. // Close closes the WriteStorage.
func (rws *WriteStorage) Close() error { func (rws *WriteStorage) Close() error {
rws.mtx.Lock() rws.mtx.Lock()

761
tsdb/agent/db.go Normal file
View file

@ -0,0 +1,761 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
"go.uber.org/atomic"
)
var (
ErrUnsupported = errors.New("unsupported operation with WAL-only storage")
)
// Default values for options.
var (
DefaultTruncateFrequency = 2 * time.Hour
DefaultMinWALTime = int64(5 * time.Minute / time.Millisecond)
DefaultMaxWALTime = int64(4 * time.Hour / time.Millisecond)
)
// Options of the WAL storage.
type Options struct {
// Segments (wal files) max size.
// WALSegmentSize <= 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
WALSegmentSize int
// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
StripeSize int
// TruncateFrequency determines how frequently to truncate data from the WAL.
TruncateFrequency time.Duration
// Shortest and longest amount of time data can exist in the WAL before being
// deleted.
MinWALTime, MaxWALTime int64
}
// DefaultOptions used for the WAL storage. They are sane for setups using
// millisecond-precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wal.DefaultSegmentSize,
WALCompression: false,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
}
}
type dbMetrics struct {
r prometheus.Registerer
numActiveSeries prometheus.Gauge
numWALSeriesPendingDeletion prometheus.Gauge
totalAppendedSamples prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
walTotalReplayDuration prometheus.Gauge
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
}
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
m := dbMetrics{r: r}
m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_agent_active_series",
Help: "Number of active series being tracked by the WAL storage",
})
m.numWALSeriesPendingDeletion = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_agent_deleted_series",
Help: "Number of series pending deletion from the WAL",
})
m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_samples_appended_total",
Help: "Total number of samples appended to the storage",
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_agent_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_corruptions_total",
Help: "Total number of WAL corruptions.",
})
m.walTotalReplayDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_agent_data_replay_duration_seconds",
Help: "Time taken to replay the data on disk.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
if r != nil {
r.MustRegister(
m.numActiveSeries,
m.numWALSeriesPendingDeletion,
m.totalAppendedSamples,
m.walTruncateDuration,
m.walCorruptionsTotal,
m.walTotalReplayDuration,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
)
}
return &m
}
func (m *dbMetrics) Unregister() {
if m.r == nil {
return
}
cs := []prometheus.Collector{
m.numActiveSeries,
m.numWALSeriesPendingDeletion,
m.totalAppendedSamples,
}
for _, c := range cs {
m.r.Unregister(c)
}
}
// DB represents a WAL-only storage. It implements storage.DB.
type DB struct {
mtx sync.RWMutex
logger log.Logger
opts *Options
rs *remote.Storage
wal *wal.WAL
appenderPool sync.Pool
bufPool sync.Pool
nextRef *atomic.Uint64
series *stripeSeries
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
// must be kept around to).
deleted map[uint64]int
donec chan struct{}
stopc chan struct{}
metrics *dbMetrics
}
// Open returns a new agent.DB in the given directory.
func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) {
opts = validateOptions(opts)
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
dir = filepath.Join(dir, "wal")
w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
if err != nil {
return nil, errors.Wrap(err, "creating WAL")
}
db := &DB{
logger: l,
opts: opts,
rs: rs,
wal: w,
nextRef: atomic.NewUint64(0),
series: newStripeSeries(opts.StripeSize),
deleted: make(map[uint64]int),
donec: make(chan struct{}),
stopc: make(chan struct{}),
metrics: newDBMetrics(reg),
}
db.bufPool.New = func() interface{} {
return make([]byte, 0, 1024)
}
db.appenderPool.New = func() interface{} {
return &appender{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
}
}
if err := db.replayWAL(); err != nil {
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
if err := w.Repair(err); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
}
}
go db.run()
return db, nil
}
func validateOptions(opts *Options) *Options {
if opts == nil {
opts = DefaultOptions()
}
if opts.WALSegmentSize <= 0 {
opts.WALSegmentSize = wal.DefaultSegmentSize
}
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) {
opts.StripeSize = tsdb.DefaultStripeSize
}
if opts.TruncateFrequency <= 0 {
opts.TruncateFrequency = DefaultTruncateFrequency
}
if opts.MinWALTime <= 0 {
opts.MinWALTime = 0
}
if opts.MaxWALTime <= 0 {
opts.MaxWALTime = DefaultMaxWALTime
}
if t := int64(opts.TruncateFrequency * time.Hour / time.Millisecond); opts.MaxWALTime < t {
opts.MaxWALTime = t
}
return opts
}
func (db *DB) replayWAL() error {
level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir())
start := time.Now()
dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir())
if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
multiRef := map[uint64]uint64{}
if err == nil {
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer func() {
if err := sr.Close(); err != nil {
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
startFrom++
level.Info(db.logger).Log("msg", "WAL checkpoint loaded")
}
// Find the last segment.
_, last, err := wal.Segments(db.wal.Dir())
if err != nil {
return errors.Wrap(err, "finding WAL segments")
}
// Backfil segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(seg)
err = db.loadWAL(wal.NewReader(sr), multiRef)
if err := sr.Close(); err != nil {
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
if err != nil {
return err
}
level.Info(db.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
}
walReplayDuration := time.Since(start)
db.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds())
return nil
}
func (db *DB) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
var (
dec record.Decoder
lastRef uint64
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool = sync.Pool{
New: func() interface{} {
return []record.RefSeries{}
},
}
samplesPool = sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
)
go func() {
defer close(decoded)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0]
series, err = dec.Series(rec, series)
if err != nil {
errCh <- &wal.CorruptionErr{
Err: errors.Wrap(err, "decode series"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- series
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
errCh <- &wal.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- samples
case record.Tombstones:
// We don't care about tombstones
continue
case record.Exemplars:
// We don't care about exemplars
continue
default:
errCh <- &wal.CorruptionErr{
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(),
Offset: r.Offset(),
}
}
}
}()
var nonExistentSeriesRefs atomic.Uint64
for d := range decoded {
switch v := d.(type) {
case []record.RefSeries:
for _, entry := range v {
// If this is a new series, create it in memory. If we never read in a
// sample for this series, its timestamp will remain at 0 and it will
// be deleted at the next GC.
if db.series.GetByID(entry.Ref) == nil {
series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
db.series.Set(entry.Labels.Hash(), series)
multiRef[entry.Ref] = series.ref
db.metrics.numActiveSeries.Inc()
if entry.Ref > lastRef {
lastRef = entry.Ref
}
}
}
//nolint:staticcheck
seriesPool.Put(v)
case []record.RefSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
if entry.T > series.lastTs {
series.lastTs = entry.T
}
}
//nolint:staticcheck
samplesPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
}
if v := nonExistentSeriesRefs.Load(); v > 0 {
level.Warn(db.logger).Log("msg", "found sample referencing non-existing series", "skipped_series", v)
}
db.nextRef.Store(lastRef)
select {
case err := <-errCh:
return err
default:
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
return nil
}
}
func (db *DB) run() {
defer close(db.donec)
Loop:
for {
select {
case <-db.stopc:
break Loop
case <-time.After(db.opts.TruncateFrequency):
// The timestamp ts is used to determine which series are not receiving
// samples and may be deleted from the WAL. Their most recent append
// timestamp is compared to ts, and if that timestamp is older then ts,
// they are considered inactive and may be deleted.
//
// Subtracting a duration from ts will add a buffer for when series are
// considered inactive and safe for deletion.
ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime
if ts < 0 {
ts = 0
}
// Network issues can prevent the result of getRemoteWriteTimestamp from
// changing. We don't want data in the WAL to grow forever, so we set a cap
// on the maximum age data can be. If our ts is older than this cutoff point,
// we'll shift it forward to start deleting very stale data.
if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {
ts = maxTS
}
level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)
if err := db.truncate(ts); err != nil {
level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)
}
}
}
}
func (db *DB) truncate(mint int64) error {
db.mtx.RLock()
defer db.mtx.RUnlock()
start := time.Now()
db.gc(mint)
level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start))
first, last, err := wal.Segments(db.wal.Dir())
if err != nil {
return errors.Wrap(err, "get segment range")
}
// Start a new segment so low ingestion volume instances don't have more WAL
// than needed.
err = db.wal.NextSegment()
if err != nil {
return errors.Wrap(err, "next segment")
}
last-- // Never consider most recent segment for checkpoint
if last < 0 {
return nil // no segments yet
}
// The lower two-thirds of segments should contain mostly obsolete samples.
// If we have less than two segments, it's not worth checkpointing yet.
last = first + (last-first)*2/3
if last <= first {
return nil
}
keep := func(id uint64) bool {
if db.series.GetByID(id) != nil {
return true
}
seg, ok := db.deleted[id]
return ok && seg >= first
}
db.metrics.checkpointCreationTotal.Inc()
if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
db.metrics.checkpointCreationFail.Inc()
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok {
db.metrics.walCorruptionsTotal.Inc()
}
return errors.Wrap(err, "create checkpoint")
}
if err := db.wal.Truncate(last + 1); err != nil {
// If truncating fails, we'll just try it again at the next checkpoint.
// Leftover segments will still just be ignored in the future if there's a
// checkpoint that supersedes them.
level.Error(db.logger).Log("msg", "truncating segments failed", "err", err)
}
// The checkpoint is written and segments before it are truncated, so we
// no longer need to track deleted series that were being kept around.
for ref, segment := range db.deleted {
if segment < first {
delete(db.deleted, ref)
}
}
db.metrics.checkpointDeleteTotal.Inc()
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space. They will just be ignored since a newer checkpoint
// exists.
level.Error(db.logger).Log("msg", "delete old checkpoints", "err", err)
db.metrics.checkpointDeleteFail.Inc()
}
db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(db.logger).Log("msg", "WAL checkpoint complete", "first", first, "last", last, "duration", time.Since(start))
return nil
}
// gc marks ref IDs that have not received a sample since mint as deleted in
// s.deleted, along with the segment where they originally got deleted.
func (db *DB) gc(mint int64) {
deleted := db.series.GC(mint)
db.metrics.numActiveSeries.Sub(float64(len(deleted)))
_, last, _ := wal.Segments(db.wal.Dir())
// We want to keep series records for any newly deleted series
// until we've passed the last recorded segment. This prevents
// the WAL having samples for series records that no longer exist.
for ref := range deleted {
db.deleted[ref] = last
}
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
}
// StartTime implements the Storage interface.
func (db *DB) StartTime() (int64, error) {
return int64(model.Latest), nil
}
// Querier implements the Storage interface.
func (db *DB) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return nil, ErrUnsupported
}
// ChunkQuerier implements the Storage interface.
func (db *DB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
return nil, ErrUnsupported
}
// ExemplarQuerier implements the Storage interface.
func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return nil, ErrUnsupported
}
// Appender implements storage.Storage.
func (db *DB) Appender(_ context.Context) storage.Appender {
return db.appenderPool.Get().(storage.Appender)
}
// Close implements the Storage interface.
func (db *DB) Close() error {
db.mtx.Lock()
defer db.mtx.Unlock()
close(db.stopc)
<-db.donec
db.metrics.Unregister()
return db.wal.Close()
}
type appender struct {
*DB
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
}
func (a *appender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
if ref == 0 {
return a.Add(l, t, v)
}
return ref, a.AddFast(ref, t, v)
}
func (a *appender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
hash := l.Hash()
series := a.series.GetByHash(hash, l)
if series != nil {
return series.ref, a.AddFast(series.ref, t, v)
}
// Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty()
if len(l) == 0 {
return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset")
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
}
ref := a.nextRef.Inc()
series = &memSeries{ref: ref, lset: l, lastTs: t}
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: ref,
Labels: l,
})
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: ref,
T: t,
V: v,
})
a.series.Set(hash, series)
a.metrics.numActiveSeries.Inc()
a.metrics.totalAppendedSamples.Inc()
return series.ref, nil
}
func (a *appender) AddFast(ref uint64, t int64, v float64) error {
series := a.series.GetByID(ref)
if series == nil {
return storage.ErrNotFound
}
series.Lock()
defer series.Unlock()
// Update last recorded timestamp. Used by Storage.gc to determine if a
// series is dead.
series.lastTs = t
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: ref,
T: t,
V: v,
})
a.metrics.totalAppendedSamples.Inc()
return nil
}
func (a *appender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
// remote_write doesn't support exemplars yet, so do nothing here.
return 0, nil
}
// Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
var encoder record.Encoder
buf := a.bufPool.Get().([]byte)
if len(a.pendingSeries) > 0 {
buf = encoder.Series(a.pendingSeries, buf)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
if len(a.pendingSamples) > 0 {
buf = encoder.Samples(a.pendingSamples, buf)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
//nolint:staticcheck
a.bufPool.Put(buf)
return a.Rollback()
}
func (a *appender) Rollback() error {
a.pendingSeries = a.pendingSeries[:0]
a.pendingSamples = a.pendingSamples[:0]
a.appenderPool.Put(a)
return nil
}

449
tsdb/agent/db_test.go Normal file
View file

@ -0,0 +1,449 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"io/ioutil"
"os"
"strconv"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/stretchr/testify/require"
)
func TestUnsupported(t *testing.T) {
promAgentDir, err := ioutil.TempDir("", "TestUnsupported")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
opts := DefaultOptions()
logger := log.NewNopLogger()
s, err := Open(logger, prometheus.DefaultRegisterer, nil, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
defer s.Close()
t.Run("Querier", func(t *testing.T) {
_, err := s.Querier(context.TODO(), 0, 0)
require.Equal(t, err, ErrUnsupported)
})
t.Run("ChunkQuerier", func(t *testing.T) {
_, err := s.ChunkQuerier(context.TODO(), 0, 0)
require.Equal(t, err, ErrUnsupported)
})
t.Run("ExemplarQuerier", func(t *testing.T) {
_, err := s.ExemplarQuerier(context.TODO())
require.Equal(t, err, ErrUnsupported)
})
}
func TestCommit(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 8
)
promAgentDir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
a := s.Appender(context.TODO())
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
sample := tsdbutil.GenerateSamples(0, 1)
_, err := a.Append(0, lset, sample[0].T(), sample[0].V())
require.NoError(t, err)
}
require.NoError(t, a.Commit())
}
// Read records from WAL and check for expected count of series and samples.
walSeriesCount := 0
walSamplesCount := 0
reg = prometheus.NewRegistry()
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err = Open(logger, nil, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
var dec record.Decoder
if err == nil {
sr, err := wal.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
r := wal.NewReader(sr)
seriesPool := sync.Pool{
New: func() interface{} {
return []record.RefSeries{}
},
}
samplesPool := sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0]
series, _ = dec.Series(rec, series)
walSeriesCount += len(series)
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples, _ = dec.Samples(rec, samples)
walSamplesCount += len(samples)
default:
}
}
}
// Retrieved series count from WAL should match the count of series been added to the WAL.
require.Equal(t, walSeriesCount, numSeries)
// Retrieved samples count from WAL should match the count of samples been added to the WAL.
require.Equal(t, walSamplesCount, numSeries*numDatapoints)
}
func TestRollback(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 8
)
promAgentDir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
a := s.Appender(context.TODO())
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
sample := tsdbutil.GenerateSamples(0, 1)
_, err := a.Append(0, lset, sample[0].T(), sample[0].V())
require.NoError(t, err)
}
}
require.NoError(t, a.Rollback())
// Read records from WAL and check for expected count of series and samples.
walSeriesCount := 0
walSamplesCount := 0
reg = prometheus.NewRegistry()
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err = Open(logger, nil, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
var dec record.Decoder
if err == nil {
sr, err := wal.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
r := wal.NewReader(sr)
seriesPool := sync.Pool{
New: func() interface{} {
return []record.RefSeries{}
},
}
samplesPool := sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0]
series, _ = dec.Series(rec, series)
walSeriesCount += len(series)
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples, _ = dec.Samples(rec, samples)
walSamplesCount += len(samples)
default:
}
}
}
// Retrieved series count from WAL should be zero.
require.Equal(t, walSeriesCount, 0)
// Retrieved samples count from WAL should be zero.
require.Equal(t, walSamplesCount, 0)
}
func TestFullTruncateWAL(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 800
lastTs = 500
)
promAgentDir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
a := s.Appender(context.TODO())
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, int64(lastTs), 0)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
}
// Truncate WAL with mint to GC all the samples.
s.truncate(lastTs + 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestPartialTruncateWAL(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 800
)
promAgentDir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
a := s.Appender(context.TODO())
var lastTs int64
// Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500.
lastTs = 500
lbls := labelsForTest(t.Name()+"batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
}
// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
lastTs = 600
lbls = labelsForTest(t.Name()+"batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
}
// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
s.truncate(lastTs - 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, m.Metric[0].Gauge.GetValue(), float64(numSeries), "agent wal truncate mismatch of deleted series count")
}
func TestWALReplay(t *testing.T) {
const (
numDatapoints = 1000
numSeries = 8
lastTs = 500
)
promAgentDir, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(promAgentDir))
})
lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
a := s.Appender(context.TODO())
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0)
require.NoError(t, err)
}
}
require.NoError(t, a.Commit())
restartOpts := DefaultOptions()
restartLogger := log.NewNopLogger()
restartReg := prometheus.NewRegistry()
s, err = Open(restartLogger, restartReg, nil, promAgentDir, restartOpts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
// Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, restartReg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained.
metrics := s.series.series
for i := 0; i < len(metrics); i++ {
mp := metrics[i]
for _, v := range mp {
require.Equal(t, v.lastTs, int64(lastTs))
}
}
}
func startTime() (int64, error) {
return time.Now().Unix() * 1000, nil
}
// Create series for tests.
func labelsForTest(lName string, seriesCount int) []labels.Labels {
var series []labels.Labels
for i := 0; i < seriesCount; i++ {
lset := labels.Labels{
{Name: "a", Value: lName},
{Name: "job", Value: "prometheus"},
{Name: "instance", Value: "localhost" + strconv.Itoa(i)},
}
series = append(series, lset)
}
return series
}
func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto.MetricFamily {
t.Helper()
families, err := reg.Gather()
require.NoError(t, err, "failed to gather metrics")
for _, f := range families {
if f.GetName() == familyName {
return f
}
}
t.Fatalf("could not find family %s", familyName)
return nil
}

177
tsdb/agent/series.go Normal file
View file

@ -0,0 +1,177 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"sync"
"github.com/prometheus/prometheus/pkg/labels"
)
// memSeries is a chunkless version of tsdb.memSeries.
type memSeries struct {
sync.Mutex
ref uint64
lset labels.Labels
lastTs int64
}
// seriesHashmap is a simple hashmap for memSeries by their label set.
// It is built on top of a regular hashmap and holds a slice of series to
// resolve hash collisions. Its methods require the hash to be submitted
// with the label set to avoid re-computing hash throughout the code.
type seriesHashmap map[uint64][]*memSeries
func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
for _, s := range m[hash] {
if labels.Equal(s.lset, lset) {
return s
}
}
return nil
}
func (m seriesHashmap) Set(hash uint64, s *memSeries) {
seriesSet := m[hash]
for i, prev := range seriesSet {
if labels.Equal(prev.lset, s.lset) {
seriesSet[i] = s
return
}
}
m[hash] = append(seriesSet, s)
}
func (m seriesHashmap) Delete(hash uint64, ref uint64) {
var rem []*memSeries
for _, s := range m[hash] {
if s.ref != ref {
rem = append(rem, s)
}
}
if len(rem) == 0 {
delete(m, hash)
} else {
m[hash] = rem
}
}
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock
// contention. The locks are padded to not be on the same cache line.
// Filling the padded space with the maps was profiled to be slower -
// likely due to the additional pointer dereferences.
type stripeSeries struct {
size int
series []map[uint64]*memSeries
hashes []seriesHashmap
locks []stripeLock
}
type stripeLock struct {
sync.RWMutex
// Padding to avoid multiple locks being on the same cache line.
_ [40]byte
}
func newStripeSeries(stripeSize int) *stripeSeries {
s := &stripeSeries{
size: stripeSize,
series: make([]map[uint64]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
locks: make([]stripeLock, stripeSize),
}
for i := range s.series {
s.series[i] = map[uint64]*memSeries{}
}
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
}
return s
}
// GC garbage collects old series that have not received a sample after mint
// and will fully delete them.
func (s *stripeSeries) GC(mint int64) map[uint64]struct{} {
deleted := map[uint64]struct{}{}
for hashLock := 0; hashLock < s.size; hashLock++ {
s.locks[hashLock].Lock()
for hash, all := range s.hashes[hashLock] {
for _, series := range all {
series.Lock()
// Any series that has received a write since mint is still alive.
if series.lastTs >= mint {
series.Unlock()
continue
}
// The series is stale. We need to obtain a second lock for the
// ref if it's different than the hash lock.
refLock := int(series.ref) & (s.size - 1)
if hashLock != refLock {
s.locks[refLock].Lock()
}
deleted[series.ref] = struct{}{}
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
if hashLock != refLock {
s.locks[refLock].Unlock()
}
series.Unlock()
}
}
s.locks[hashLock].Unlock()
}
return deleted
}
func (s *stripeSeries) GetByID(id uint64) *memSeries {
refLock := id & uint64(s.size-1)
s.locks[refLock].RLock()
defer s.locks[refLock].RUnlock()
return s.series[refLock][id]
}
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
hashLock := hash & uint64(s.size-1)
s.locks[hashLock].RLock()
defer s.locks[hashLock].RUnlock()
return s.hashes[hashLock].Get(hash, lset)
}
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
var (
hashLock = hash & uint64(s.size-1)
refLock = series.ref & uint64(s.size-1)
)
s.locks[hashLock].Lock()
defer s.locks[hashLock].Unlock()
if hashLock != refLock {
s.locks[refLock].Lock()
defer s.locks[refLock].Unlock()
}
s.hashes[hashLock].Set(hash, series)
s.series[refLock][series.ref] = series
}

View file

@ -181,6 +181,7 @@ type API struct {
buildInfo *PrometheusVersion buildInfo *PrometheusVersion
runtimeInfo func() (RuntimeInfo, error) runtimeInfo func() (RuntimeInfo, error)
gatherer prometheus.Gatherer gatherer prometheus.Gatherer
isAgent bool
remoteWriteHandler http.Handler remoteWriteHandler http.Handler
remoteReadHandler http.Handler remoteReadHandler http.Handler
@ -211,6 +212,7 @@ func NewAPI(
remoteReadSampleLimit int, remoteReadSampleLimit int,
remoteReadConcurrencyLimit int, remoteReadConcurrencyLimit int,
remoteReadMaxBytesInFrame int, remoteReadMaxBytesInFrame int,
isAgent bool,
CORSOrigin *regexp.Regexp, CORSOrigin *regexp.Regexp,
runtimeInfo func() (RuntimeInfo, error), runtimeInfo func() (RuntimeInfo, error),
buildInfo *PrometheusVersion, buildInfo *PrometheusVersion,
@ -239,6 +241,7 @@ func NewAPI(
runtimeInfo: runtimeInfo, runtimeInfo: runtimeInfo,
buildInfo: buildInfo, buildInfo: buildInfo,
gatherer: gatherer, gatherer: gatherer,
isAgent: isAgent,
remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame), remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame),
} }
@ -282,26 +285,35 @@ func (api *API) Register(r *route.Router) {
}.ServeHTTP) }.ServeHTTP)
} }
wrapAgent := func(f apiFunc) http.HandlerFunc {
return wrap(func(r *http.Request) apiFuncResult {
if api.isAgent {
return apiFuncResult{nil, &apiError{errorExec, errors.New("unavailable with Prometheus Agent")}, nil, nil}
}
return f(r)
})
}
r.Options("/*path", wrap(api.options)) r.Options("/*path", wrap(api.options))
r.Get("/query", wrap(api.query)) r.Get("/query", wrapAgent(api.query))
r.Post("/query", wrap(api.query)) r.Post("/query", wrapAgent(api.query))
r.Get("/query_range", wrap(api.queryRange)) r.Get("/query_range", wrapAgent(api.queryRange))
r.Post("/query_range", wrap(api.queryRange)) r.Post("/query_range", wrapAgent(api.queryRange))
r.Get("/query_exemplars", wrap(api.queryExemplars)) r.Get("/query_exemplars", wrapAgent(api.queryExemplars))
r.Post("/query_exemplars", wrap(api.queryExemplars)) r.Post("/query_exemplars", wrapAgent(api.queryExemplars))
r.Get("/labels", wrap(api.labelNames)) r.Get("/labels", wrapAgent(api.labelNames))
r.Post("/labels", wrap(api.labelNames)) r.Post("/labels", wrapAgent(api.labelNames))
r.Get("/label/:name/values", wrap(api.labelValues)) r.Get("/label/:name/values", wrapAgent(api.labelValues))
r.Get("/series", wrap(api.series)) r.Get("/series", wrapAgent(api.series))
r.Post("/series", wrap(api.series)) r.Post("/series", wrapAgent(api.series))
r.Del("/series", wrap(api.dropSeries)) r.Del("/series", wrapAgent(api.dropSeries))
r.Get("/targets", wrap(api.targets)) r.Get("/targets", wrap(api.targets))
r.Get("/targets/metadata", wrap(api.targetMetadata)) r.Get("/targets/metadata", wrap(api.targetMetadata))
r.Get("/alertmanagers", wrap(api.alertmanagers)) r.Get("/alertmanagers", wrapAgent(api.alertmanagers))
r.Get("/metadata", wrap(api.metricMetadata)) r.Get("/metadata", wrap(api.metricMetadata))
@ -309,22 +321,22 @@ func (api *API) Register(r *route.Router) {
r.Get("/status/runtimeinfo", wrap(api.serveRuntimeInfo)) r.Get("/status/runtimeinfo", wrap(api.serveRuntimeInfo))
r.Get("/status/buildinfo", wrap(api.serveBuildInfo)) r.Get("/status/buildinfo", wrap(api.serveBuildInfo))
r.Get("/status/flags", wrap(api.serveFlags)) r.Get("/status/flags", wrap(api.serveFlags))
r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) r.Get("/status/tsdb", wrapAgent(api.serveTSDBStatus))
r.Get("/status/walreplay", api.serveWALReplayStatus) r.Get("/status/walreplay", api.serveWALReplayStatus)
r.Post("/read", api.ready(api.remoteRead)) r.Post("/read", api.ready(api.remoteRead))
r.Post("/write", api.ready(api.remoteWrite)) r.Post("/write", api.ready(api.remoteWrite))
r.Get("/alerts", wrap(api.alerts)) r.Get("/alerts", wrapAgent(api.alerts))
r.Get("/rules", wrap(api.rules)) r.Get("/rules", wrapAgent(api.rules))
// Admin APIs // Admin APIs
r.Post("/admin/tsdb/delete_series", wrap(api.deleteSeries)) r.Post("/admin/tsdb/delete_series", wrapAgent(api.deleteSeries))
r.Post("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones)) r.Post("/admin/tsdb/clean_tombstones", wrapAgent(api.cleanTombstones))
r.Post("/admin/tsdb/snapshot", wrap(api.snapshot)) r.Post("/admin/tsdb/snapshot", wrapAgent(api.snapshot))
r.Put("/admin/tsdb/delete_series", wrap(api.deleteSeries)) r.Put("/admin/tsdb/delete_series", wrapAgent(api.deleteSeries))
r.Put("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones)) r.Put("/admin/tsdb/clean_tombstones", wrapAgent(api.cleanTombstones))
r.Put("/admin/tsdb/snapshot", wrap(api.snapshot)) r.Put("/admin/tsdb/snapshot", wrapAgent(api.snapshot))
} }
type queryData struct { type queryData struct {

View file

@ -248,6 +248,7 @@ type Options struct {
RemoteReadConcurrencyLimit int RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int RemoteReadBytesInFrame int
RemoteWriteReceiver bool RemoteWriteReceiver bool
IsAgent bool
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
Registerer prometheus.Registerer Registerer prometheus.Registerer
@ -328,6 +329,7 @@ func New(logger log.Logger, o *Options) *Handler {
h.options.RemoteReadSampleLimit, h.options.RemoteReadSampleLimit,
h.options.RemoteReadConcurrencyLimit, h.options.RemoteReadConcurrencyLimit,
h.options.RemoteReadBytesInFrame, h.options.RemoteReadBytesInFrame,
h.options.IsAgent,
h.options.CORSOrigin, h.options.CORSOrigin,
h.runtimeInfo, h.runtimeInfo,
h.versionInfo, h.versionInfo,

View file

@ -526,6 +526,68 @@ func TestHandleMultipleQuitRequests(t *testing.T) {
} }
} }
// Test for availability of API endpoints in Prometheus Agent mode.
func TestAgentAPIEndPoints(t *testing.T) {
t.Parallel()
opts := &Options{
ListenAddress: ":9090",
ReadTimeout: 30 * time.Second,
MaxConnections: 512,
Context: nil,
Storage: nil,
QueryEngine: nil,
ScrapeManager: &scrape.Manager{},
RuleManager: &rules.Manager{},
Notifier: nil,
RoutePrefix: "/",
EnableAdminAPI: true,
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:9090",
Path: "/",
},
Version: &PrometheusVersion{},
Gatherer: prometheus.DefaultGatherer,
IsAgent: true,
}
opts.Flags = map[string]string{}
webHandler := New(nil, opts)
webHandler.Ready()
// Test for non-available endpoints in the Agent mode.
for _, u := range []string{
"http://localhost:9090/-/labels",
"http://localhost:9090/label",
"http://localhost:9090/series",
"http://localhost:9090/alertmanagers",
"http://localhost:9090/query",
"http://localhost:9090/query_range",
"http://localhost:9090/query_exemplars",
} {
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", u, nil)
require.NoError(t, err)
webHandler.router.ServeHTTP(w, req)
require.Equal(t, http.StatusNotFound, w.Code)
}
// Test for available endpoints in the Agent mode.
for _, u := range []string{
"http://localhost:9090/targets",
"http://localhost:9090/status",
} {
w := httptest.NewRecorder()
req, err := http.NewRequest("GET", u, nil)
require.NoError(t, err)
webHandler.router.ServeHTTP(w, req)
fmt.Println(u)
require.Equal(t, http.StatusOK, w.Code)
}
}
func cleanupTestResponse(t *testing.T, resp *http.Response) { func cleanupTestResponse(t *testing.T, resp *http.Response) {
_, err := io.Copy(ioutil.Discard, resp.Body) _, err := io.Copy(ioutil.Discard, resp.Body)
require.NoError(t, err) require.NoError(t, err)