diff --git a/.gitignore b/.gitignore index b382f6c0e..35564c059 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,9 @@ /promtool benchmark.txt /data +/data-agent /cmd/prometheus/data +/cmd/prometheus/data-agent /cmd/prometheus/debug /benchout /cmd/promtool/data diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d50cd3e7f..e69e9f2db 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -72,11 +72,14 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/agent" "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/web" ) var ( + appName = "prometheus" + configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_config_last_reload_successful", Help: "Whether the last configuration reload attempt was successful.", @@ -88,10 +91,13 @@ var ( defaultRetentionString = "15d" defaultRetentionDuration model.Duration + + agentMode bool + agentOnlyFlags, serverOnlyFlags []string ) func init() { - prometheus.MustRegister(version.NewCollector("prometheus")) + prometheus.MustRegister(version.NewCollector(strings.ReplaceAll(appName, "-", "_"))) var err error defaultRetentionDuration, err = model.ParseDuration(defaultRetentionString) @@ -100,6 +106,37 @@ func init() { } } +// agentOnlySetting can be provided to a kingpin flag's PreAction to mark a +// flag as agent-only. +func agentOnlySetting() func(*kingpin.ParseContext) error { + return func(pc *kingpin.ParseContext) error { + agentOnlyFlags = append(agentOnlyFlags, extractFlagName(pc)) + return nil + } +} + +// serverOnlySetting can be provided to a kingpin flag's PreAction to mark a +// flag as server-only. +func serverOnlySetting() func(*kingpin.ParseContext) error { + return func(pc *kingpin.ParseContext) error { + serverOnlyFlags = append(serverOnlyFlags, extractFlagName(pc)) + return nil + } +} + +// extractFlagName gets the flag name from the ParseContext. Only call +// from agentOnlySetting or serverOnlySetting. +func extractFlagName(pc *kingpin.ParseContext) string { + for _, pe := range pc.Elements { + fc, ok := pe.Clause.(*kingpin.FlagClause) + if !ok { + continue + } + return fc.Model().Name + } + panic("extractFlagName not called from a kingpin PreAction. This is a bug, please report to Prometheus.") +} + type flagConfig struct { configFile string @@ -111,6 +148,7 @@ type flagConfig struct { web web.Options scrape scrape.Options tsdb tsdbOptions + agent agentOptions lookbackDelta model.Duration webTimeout model.Duration queryTimeout model.Duration @@ -196,10 +234,12 @@ func main() { a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout) - a.Version(version.Print("prometheus")) + a.Version(version.Print(appName)) a.HelpFlag.Short('h') + a.Flag("agent", "Agent mode.").BoolVar(&agentMode) + a.Flag("config.file", "Prometheus configuration file path."). Default("prometheus.yml").StringVar(&cfg.configFile) @@ -245,60 +285,105 @@ func main() { Default(".*").StringVar(&cfg.corsRegexString) a.Flag("storage.tsdb.path", "Base path for metrics storage."). + PreAction(serverOnlySetting()). Default("data/").StringVar(&cfg.localStoragePath) a.Flag("storage.tsdb.min-block-duration", "Minimum duration of a data block before being persisted. For use in testing."). + PreAction(serverOnlySetting()). Hidden().Default("2h").SetValue(&cfg.tsdb.MinBlockDuration) a.Flag("storage.tsdb.max-block-duration", "Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)"). + PreAction(serverOnlySetting()). Hidden().PlaceHolder("").SetValue(&cfg.tsdb.MaxBlockDuration) a.Flag("storage.tsdb.max-block-chunk-segment-size", "The maximum size for a single chunk segment in a block. Example: 512MB"). + PreAction(serverOnlySetting()). Hidden().PlaceHolder("").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize) a.Flag("storage.tsdb.wal-segment-size", "Size at which to split the tsdb WAL segment files. Example: 100MB"). + PreAction(serverOnlySetting()). Hidden().PlaceHolder("").BytesVar(&cfg.tsdb.WALSegmentSize) a.Flag("storage.tsdb.retention", "[DEPRECATED] How long to retain samples in storage. This flag has been deprecated, use \"storage.tsdb.retention.time\" instead."). + PreAction(serverOnlySetting()). SetValue(&oldFlagRetentionDuration) a.Flag("storage.tsdb.retention.time", "How long to retain samples in storage. When this flag is set it overrides \"storage.tsdb.retention\". If neither this flag nor \"storage.tsdb.retention\" nor \"storage.tsdb.retention.size\" is set, the retention time defaults to "+defaultRetentionString+". Units Supported: y, w, d, h, m, s, ms."). + PreAction(serverOnlySetting()). SetValue(&newFlagRetentionDuration) a.Flag("storage.tsdb.retention.size", "Maximum number of bytes that can be stored for blocks. A unit is required, supported units: B, KB, MB, GB, TB, PB, EB. Ex: \"512MB\"."). + PreAction(serverOnlySetting()). BytesVar(&cfg.tsdb.MaxBytes) a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). + PreAction(serverOnlySetting()). Default("false").BoolVar(&cfg.tsdb.NoLockfile) a.Flag("storage.tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge."). + PreAction(serverOnlySetting()). Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks) a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL."). + PreAction(serverOnlySetting()). Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) + a.Flag("storage.agent.path", "Base path for metrics storage."). + PreAction(agentOnlySetting()). + Default("data-agent/").StringVar(&cfg.localStoragePath) + + a.Flag("storage.agent.segment-size", + "Size at which to split WAL segment files. Example: 100MB"). + PreAction(agentOnlySetting()). + Hidden().PlaceHolder("").BytesVar(&cfg.agent.WALSegmentSize) + + a.Flag("storage.agent.compression", "Compress the agent WAL."). + PreAction(agentOnlySetting()). + Default("true").BoolVar(&cfg.agent.WALCompression) + + a.Flag("storage.agent.truncate-frequency", + "The frequency at which to truncate the WAL and remove old data."). + PreAction(agentOnlySetting()). + Hidden().PlaceHolder("").SetValue(&cfg.agent.TruncateFrequency) + + a.Flag("storage.agent.retention.min-time", + "Minimum age samples may be before being considered for deletion when the WAL is truncated"). + PreAction(agentOnlySetting()). + SetValue(&cfg.agent.MinWALTime) + + a.Flag("storage.agent.retention.max-time", + "Maximum age samples may be before being forcibly deleted when the WAL is truncated"). + PreAction(agentOnlySetting()). + SetValue(&cfg.agent.MaxWALTime) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types."). + PreAction(serverOnlySetting()). Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit) a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit."). + PreAction(serverOnlySetting()). Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit) a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default."). + PreAction(serverOnlySetting()). Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame) a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). + PreAction(serverOnlySetting()). Default("1h").SetValue(&cfg.outageTolerance) a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period."). + PreAction(serverOnlySetting()). Default("10m").SetValue(&cfg.forGracePeriod) a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). + PreAction(serverOnlySetting()). Default("1m").SetValue(&cfg.resendDelay) a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). @@ -308,21 +393,26 @@ func main() { Hidden().Default("2ms").DurationVar(&scrape.ScrapeTimestampTolerance) a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). + PreAction(serverOnlySetting()). Default("10000").IntVar(&cfg.notifier.QueueCapacity) // TODO: Remove in Prometheus 3.0. alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String() a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation."). + PreAction(serverOnlySetting()). Default("5m").SetValue(&cfg.lookbackDelta) a.Flag("query.timeout", "Maximum time a query may take before being aborted."). + PreAction(serverOnlySetting()). Default("2m").SetValue(&cfg.queryTimeout) a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). + PreAction(serverOnlySetting()). Default("20").IntVar(&cfg.queryConcurrency) a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). + PreAction(serverOnlySetting()). Default("50000000").IntVar(&cfg.queryMaxSamples) a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). @@ -390,7 +480,8 @@ func main() { // RoutePrefix must always be at least '/'. cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/") - { // Time retention settings. + if !agentMode { + // Time retention settings. if oldFlagRetentionDuration != 0 { level.Warn(logger).Log("deprecation_notice", "'storage.tsdb.retention' flag is deprecated use 'storage.tsdb.retention.time' instead.") cfg.tsdb.RetentionDuration = oldFlagRetentionDuration @@ -415,9 +506,8 @@ func main() { cfg.tsdb.RetentionDuration = y level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String()) } - } - { // Max block size settings. + // Max block size settings. if cfg.tsdb.MaxBlockDuration == 0 { maxBlockDuration, err := model.ParseDuration("31d") if err != nil { @@ -483,7 +573,12 @@ func main() { var ( scrapeManager = scrape.NewManager(&cfg.scrape, log.With(logger, "component", "scrape manager"), fanoutStorage) - opts = promql.EngineOpts{ + queryEngine *promql.Engine + ruleManager *rules.Manager + ) + + if !agentMode { + opts := promql.EngineOpts{ Logger: log.With(logger, "component", "query engine"), Reg: prometheus.DefaultRegisterer, MaxSamples: cfg.queryMaxSamples, @@ -510,7 +605,7 @@ func main() { ForGracePeriod: time.Duration(cfg.forGracePeriod), ResendDelay: time.Duration(cfg.resendDelay), }) - ) + } scraper.Set(scrapeManager) @@ -526,6 +621,7 @@ func main() { cfg.web.RuleManager = ruleManager cfg.web.Notifier = notifierManager cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta) + cfg.web.IsAgent = agentMode cfg.web.Version = &web.PrometheusVersion{ Version: version.Version, @@ -572,6 +668,11 @@ func main() { }, { name: "query_engine", reloader: func(cfg *config.Config) error { + if agentMode { + // No-op in Agent mode. + return nil + } + if cfg.GlobalConfig.QueryLogFile == "" { queryEngine.SetQueryLogger(nil) return nil @@ -613,6 +714,11 @@ func main() { }, { name: "rules", reloader: func(cfg *config.Config) error { + if agentMode { + // No-op in Agent mode + return nil + } + // Get all rule files matching the configuration paths. var files []string for _, pat := range cfg.RuleFiles { @@ -817,7 +923,7 @@ func main() { }, ) } - { + if !agentMode { // Rule manager. g.Add( func() error { @@ -829,8 +935,7 @@ func main() { ruleManager.Stop() }, ) - } - { + // TSDB. opts := cfg.tsdb.ToTSDBOptions() cancel := make(chan struct{}) @@ -892,6 +997,59 @@ func main() { }, ) } + if agentMode { + // WAL storage. + opts := cfg.agent.ToAgentOptions() + cancel := make(chan struct{}) + g.Add( + func() error { + level.Info(logger).Log("msg", "Starting WAL storage ...") + if cfg.agent.WALSegmentSize != 0 { + if cfg.agent.WALSegmentSize < 10*1024*1024 || cfg.agent.WALSegmentSize > 256*1024*1024 { + return errors.New("flag 'storage.agent.segment-size' must be set between 10MB and 256MB") + } + } + db, err := agent.Open( + logger, + prometheus.DefaultRegisterer, + remoteStorage, + cfg.localStoragePath, + &opts, + ) + if err != nil { + return errors.Wrap(err, "opening storage failed") + } + + switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType { + case "NFS_SUPER_MAGIC": + level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") + default: + level.Info(logger).Log("fs_type", fsType) + } + + level.Info(logger).Log("msg", "Agent WAL storage started") + level.Debug(logger).Log("msg", "Agent WAL storage options", + "WALSegmentSize", cfg.agent.WALSegmentSize, + "WALCompression", cfg.agent.WALCompression, + "StripeSize", cfg.agent.StripeSize, + "TruncateFrequency", cfg.agent.TruncateFrequency, + "MinWALTime", cfg.agent.MinWALTime, + "MaxWALTime", cfg.agent.MaxWALTime, + ) + + localStorage.Set(db, 0) + close(dbOpen) + <-cancel + return nil + }, + func(e error) { + if err := fanoutStorage.Close(); err != nil { + level.Error(logger).Log("msg", "Error stopping storage", "err", err) + } + close(cancel) + }, + ) + } { // Web handler. g.Add( @@ -1015,6 +1173,25 @@ func reloadConfig(filename string, expandExternalLabels bool, enableExemplarStor } } + // Perform validation for Agent-compatible configs and remove anything that's unsupported. + if agentMode { + // Perform validation for Agent-compatible configs and remove anything that's + // unsupported. + if len(conf.AlertingConfig.AlertRelabelConfigs) > 0 || len(conf.AlertingConfig.AlertmanagerConfigs) > 0 { + level.Warn(logger).Log("msg", "alerting configs not supported in agent mode") + conf.AlertingConfig.AlertRelabelConfigs = []*relabel.Config{} + conf.AlertingConfig.AlertmanagerConfigs = config.AlertmanagerConfigs{} + } + if len(conf.RuleFiles) > 0 { + level.Warn(logger).Log("msg", "recording rules not supported in agent mode") + conf.RuleFiles = []string{} + } + if len(conf.RemoteReadConfigs) > 0 { + level.Warn(logger).Log("msg", "remote_read configs not supported in agent mode") + conf.RemoteReadConfigs = []*config.RemoteReadConfig{} + } + } + failed := false for _, rl := range rls { rstart := time.Now() @@ -1115,18 +1292,21 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc { // storage at a later point in time. type readyStorage struct { mtx sync.RWMutex - db *tsdb.DB + db storage.Storage startTimeMargin int64 stats *tsdb.DBStats } func (s *readyStorage) ApplyConfig(conf *config.Config) error { db := s.get() - return db.ApplyConfig(conf) + if db, ok := db.(*tsdb.DB); ok { + return db.ApplyConfig(conf) + } + return nil } // Set the storage. -func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { +func (s *readyStorage) Set(db storage.Storage, startTimeMargin int64) { s.mtx.Lock() defer s.mtx.Unlock() @@ -1134,7 +1314,7 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.startTimeMargin = startTimeMargin } -func (s *readyStorage) get() *tsdb.DB { +func (s *readyStorage) get() storage.Storage { s.mtx.RLock() x := s.db s.mtx.RUnlock() @@ -1151,15 +1331,21 @@ func (s *readyStorage) getStats() *tsdb.DBStats { // StartTime implements the Storage interface. func (s *readyStorage) StartTime() (int64, error) { if x := s.get(); x != nil { - var startTime int64 - - if len(x.Blocks()) > 0 { - startTime = x.Blocks()[0].Meta().MinTime - } else { - startTime = time.Now().Unix() * 1000 + switch db := x.(type) { + case *tsdb.DB: + var startTime int64 + if len(db.Blocks()) > 0 { + startTime = db.Blocks()[0].Meta().MinTime + } else { + startTime = time.Now().Unix() * 1000 + } + // Add a safety margin as it may take a few minutes for everything to spin up. + return startTime + s.startTimeMargin, nil + case *agent.DB: + return db.StartTime() + default: + panic(fmt.Sprintf("unkown storage type %T", db)) } - // Add a safety margin as it may take a few minutes for everything to spin up. - return startTime + s.startTimeMargin, nil } return math.MaxInt64, tsdb.ErrNotReady @@ -1183,7 +1369,14 @@ func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (stor func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { if x := s.get(); x != nil { - return x.ExemplarQuerier(ctx) + switch db := x.(type) { + case *tsdb.DB: + return db.ExemplarQuerier(ctx) + case *agent.DB: + return nil, agent.ErrUnsupported + default: + panic(fmt.Sprintf("unknown storage type %T", db)) + } } return nil, tsdb.ErrNotReady } @@ -1221,7 +1414,14 @@ func (s *readyStorage) Close() error { // CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. func (s *readyStorage) CleanTombstones() error { if x := s.get(); x != nil { - return x.CleanTombstones() + switch db := x.(type) { + case *tsdb.DB: + return db.CleanTombstones() + case *agent.DB: + return agent.ErrUnsupported + default: + panic(fmt.Sprintf("unknown storage type %T", db)) + } } return tsdb.ErrNotReady } @@ -1229,7 +1429,14 @@ func (s *readyStorage) CleanTombstones() error { // Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { if x := s.get(); x != nil { - return x.Delete(mint, maxt, ms...) + switch db := x.(type) { + case *tsdb.DB: + return db.Delete(mint, maxt, ms...) + case *agent.DB: + return agent.ErrUnsupported + default: + panic(fmt.Sprintf("unknown storage type %T", db)) + } } return tsdb.ErrNotReady } @@ -1237,7 +1444,14 @@ func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { // Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. func (s *readyStorage) Snapshot(dir string, withHead bool) error { if x := s.get(); x != nil { - return x.Snapshot(dir, withHead) + switch db := x.(type) { + case *tsdb.DB: + return db.Snapshot(dir, withHead) + case *agent.DB: + return agent.ErrUnsupported + default: + panic(fmt.Sprintf("unknown storage type %T", db)) + } } return tsdb.ErrNotReady } @@ -1245,7 +1459,14 @@ func (s *readyStorage) Snapshot(dir string, withHead bool) error { // Stats implements the api_v1.TSDBAdminStats interface. func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) { if x := s.get(); x != nil { - return x.Head().Stats(statsByLabelName), nil + switch db := x.(type) { + case *tsdb.DB: + return db.Head().Stats(statsByLabelName), nil + case *agent.DB: + return nil, agent.ErrUnsupported + default: + panic(fmt.Sprintf("unknown storage type %T", db)) + } } return nil, tsdb.ErrNotReady } @@ -1323,6 +1544,27 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { } } +// agentOptions is a version of agent.Options with defined units. This is required +// as agent.Option fields are unit agnostic (time). +type agentOptions struct { + WALSegmentSize units.Base2Bytes + WALCompression bool + StripeSize int + TruncateFrequency model.Duration + MinWALTime, MaxWALTime model.Duration +} + +func (opts agentOptions) ToAgentOptions() agent.Options { + return agent.Options{ + WALSegmentSize: int(opts.WALSegmentSize), + WALCompression: opts.WALCompression, + StripeSize: opts.StripeSize, + TruncateFrequency: time.Duration(opts.TruncateFrequency), + MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), + MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), + } +} + func initTracing(logger log.Logger) (io.Closer, error) { // Set tracing configuration defaults. cfg := &jcfg.Configuration{ diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 98b2dd9cf..6f9a1d566 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -347,3 +347,23 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames } return res } + +func TestAgentSuccessfulStartup(t *testing.T) { + prom := exec.Command(promPath, "-test.main", "--agent", "--config.file="+promConfig) + err := prom.Start() + require.NoError(t, err) + + expectedExitStatus := 0 + actualExitStatus := 0 + + done := make(chan error, 1) + go func() { done <- prom.Wait() }() + select { + case err := <-done: + t.Logf("prometheus agent should be still running: %v", err) + actualExitStatus = prom.ProcessState.ExitCode() + case <-time.After(5 * time.Second): + prom.Process.Kill() + } + require.Equal(t, expectedExitStatus, actualExitStatus) +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5716605a8..df20af713 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -179,6 +179,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender { return s.rws.Appender(ctx) } +// LowestSentTimestamp returns the lowest sent timestamp across all queues. +func (s *Storage) LowestSentTimestamp() int64 { + return s.rws.LowestSentTimestamp() +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() diff --git a/storage/remote/write.go b/storage/remote/write.go index b3fec364a..b56d258d7 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -16,6 +16,7 @@ package remote import ( "context" "fmt" + "math" "sync" "time" @@ -207,6 +208,26 @@ func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { } } +// LowestSentTimestamp returns the lowest sent timestamp across all queues. +func (rws *WriteStorage) LowestSentTimestamp() int64 { + rws.mtx.Lock() + defer rws.mtx.Unlock() + + var lowestTs int64 = math.MaxInt64 + + for _, q := range rws.queues { + ts := int64(q.metrics.highestSentTimestamp.Get() * 1000) + if ts < lowestTs { + lowestTs = ts + } + } + if len(rws.queues) == 0 { + lowestTs = 0 + } + + return lowestTs +} + // Close closes the WriteStorage. func (rws *WriteStorage) Close() error { rws.mtx.Lock() diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go new file mode 100644 index 000000000..40e192f5c --- /dev/null +++ b/tsdb/agent/db.go @@ -0,0 +1,761 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "fmt" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" + "go.uber.org/atomic" +) + +var ( + ErrUnsupported = errors.New("unsupported operation with WAL-only storage") +) + +// Default values for options. +var ( + DefaultTruncateFrequency = 2 * time.Hour + DefaultMinWALTime = int64(5 * time.Minute / time.Millisecond) + DefaultMaxWALTime = int64(4 * time.Hour / time.Millisecond) +) + +// Options of the WAL storage. +type Options struct { + // Segments (wal files) max size. + // WALSegmentSize <= 0, segment size is default size. + // WALSegmentSize > 0, segment size is WALSegmentSize. + WALSegmentSize int + + // WALCompression will turn on Snappy compression for records on the WAL. + WALCompression bool + + // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. + StripeSize int + + // TruncateFrequency determines how frequently to truncate data from the WAL. + TruncateFrequency time.Duration + + // Shortest and longest amount of time data can exist in the WAL before being + // deleted. + MinWALTime, MaxWALTime int64 +} + +// DefaultOptions used for the WAL storage. They are sane for setups using +// millisecond-precision timestamps. +func DefaultOptions() *Options { + return &Options{ + WALSegmentSize: wal.DefaultSegmentSize, + WALCompression: false, + StripeSize: tsdb.DefaultStripeSize, + TruncateFrequency: DefaultTruncateFrequency, + MinWALTime: DefaultMinWALTime, + MaxWALTime: DefaultMaxWALTime, + } +} + +type dbMetrics struct { + r prometheus.Registerer + + numActiveSeries prometheus.Gauge + numWALSeriesPendingDeletion prometheus.Gauge + totalAppendedSamples prometheus.Counter + walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter + walTotalReplayDuration prometheus.Gauge + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter +} + +func newDBMetrics(r prometheus.Registerer) *dbMetrics { + m := dbMetrics{r: r} + m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_active_series", + Help: "Number of active series being tracked by the WAL storage", + }) + + m.numWALSeriesPendingDeletion = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_deleted_series", + Help: "Number of series pending deletion from the WAL", + }) + + m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_samples_appended_total", + Help: "Total number of samples appended to the storage", + }) + + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_agent_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + }) + + m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_corruptions_total", + Help: "Total number of WAL corruptions.", + }) + + m.walTotalReplayDuration = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_agent_data_replay_duration_seconds", + Help: "Time taken to replay the data on disk.", + }) + + m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }) + + m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }) + + m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }) + + m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_agent_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", + }) + + if r != nil { + r.MustRegister( + m.numActiveSeries, + m.numWALSeriesPendingDeletion, + m.totalAppendedSamples, + m.walTruncateDuration, + m.walCorruptionsTotal, + m.walTotalReplayDuration, + m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, + ) + } + + return &m +} + +func (m *dbMetrics) Unregister() { + if m.r == nil { + return + } + cs := []prometheus.Collector{ + m.numActiveSeries, + m.numWALSeriesPendingDeletion, + m.totalAppendedSamples, + } + for _, c := range cs { + m.r.Unregister(c) + } +} + +// DB represents a WAL-only storage. It implements storage.DB. +type DB struct { + mtx sync.RWMutex + logger log.Logger + opts *Options + rs *remote.Storage + + wal *wal.WAL + + appenderPool sync.Pool + bufPool sync.Pool + + nextRef *atomic.Uint64 + series *stripeSeries + // deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they + // must be kept around to). + deleted map[uint64]int + + donec chan struct{} + stopc chan struct{} + + metrics *dbMetrics +} + +// Open returns a new agent.DB in the given directory. +func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) { + opts = validateOptions(opts) + + // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. + dir = filepath.Join(dir, "wal") + + w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) + if err != nil { + return nil, errors.Wrap(err, "creating WAL") + } + + db := &DB{ + logger: l, + opts: opts, + rs: rs, + + wal: w, + + nextRef: atomic.NewUint64(0), + series: newStripeSeries(opts.StripeSize), + deleted: make(map[uint64]int), + + donec: make(chan struct{}), + stopc: make(chan struct{}), + + metrics: newDBMetrics(reg), + } + + db.bufPool.New = func() interface{} { + return make([]byte, 0, 1024) + } + + db.appenderPool.New = func() interface{} { + return &appender{ + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + } + } + + if err := db.replayWAL(); err != nil { + level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := w.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } + } + + go db.run() + return db, nil +} + +func validateOptions(opts *Options) *Options { + if opts == nil { + opts = DefaultOptions() + } + if opts.WALSegmentSize <= 0 { + opts.WALSegmentSize = wal.DefaultSegmentSize + } + + // Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2. + if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) { + opts.StripeSize = tsdb.DefaultStripeSize + } + if opts.TruncateFrequency <= 0 { + opts.TruncateFrequency = DefaultTruncateFrequency + } + if opts.MinWALTime <= 0 { + opts.MinWALTime = 0 + } + if opts.MaxWALTime <= 0 { + opts.MaxWALTime = DefaultMaxWALTime + } + + if t := int64(opts.TruncateFrequency * time.Hour / time.Millisecond); opts.MaxWALTime < t { + opts.MaxWALTime = t + } + return opts +} + +func (db *DB) replayWAL() error { + level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir()) + start := time.Now() + + dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir()) + if err != nil && err != record.ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + + multiRef := map[uint64]uint64{} + + if err == nil { + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer func() { + if err := sr.Close(); err != nil { + level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + startFrom++ + level.Info(db.logger).Log("msg", "WAL checkpoint loaded") + } + + // Find the last segment. + _, last, err := wal.Segments(db.wal.Dir()) + if err != nil { + return errors.Wrap(err, "finding WAL segments") + } + + // Backfil segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + } + + sr := wal.NewSegmentBufReader(seg) + err = db.loadWAL(wal.NewReader(sr), multiRef) + if err := sr.Close(); err != nil { + level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + if err != nil { + return err + } + level.Info(db.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last) + } + + walReplayDuration := time.Since(start) + db.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) + + return nil +} + +func (db *DB) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { + var ( + dec record.Decoder + lastRef uint64 + + decoded = make(chan interface{}, 10) + errCh = make(chan error, 1) + seriesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + ) + + go func() { + defer close(decoded) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, err = dec.Series(rec, series) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- samples + case record.Tombstones: + // We don't care about tombstones + continue + case record.Exemplars: + // We don't care about exemplars + continue + default: + errCh <- &wal.CorruptionErr{ + Err: errors.Errorf("invalid record type %v", dec.Type(rec)), + Segment: r.Segment(), + Offset: r.Offset(), + } + } + } + }() + + var nonExistentSeriesRefs atomic.Uint64 + + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, entry := range v { + // If this is a new series, create it in memory. If we never read in a + // sample for this series, its timestamp will remain at 0 and it will + // be deleted at the next GC. + if db.series.GetByID(entry.Ref) == nil { + series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} + db.series.Set(entry.Labels.Hash(), series) + multiRef[entry.Ref] = series.ref + db.metrics.numActiveSeries.Inc() + if entry.Ref > lastRef { + lastRef = entry.Ref + } + } + } + + //nolint:staticcheck + seriesPool.Put(v) + case []record.RefSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + + //nolint:staticcheck + samplesPool.Put(v) + default: + panic(fmt.Errorf("unexpected decoded type: %T", d)) + } + } + + if v := nonExistentSeriesRefs.Load(); v > 0 { + level.Warn(db.logger).Log("msg", "found sample referencing non-existing series", "skipped_series", v) + } + + db.nextRef.Store(lastRef) + + select { + case err := <-errCh: + return err + default: + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + return nil + } +} + +func (db *DB) run() { + defer close(db.donec) + +Loop: + for { + select { + case <-db.stopc: + break Loop + case <-time.After(db.opts.TruncateFrequency): + // The timestamp ts is used to determine which series are not receiving + // samples and may be deleted from the WAL. Their most recent append + // timestamp is compared to ts, and if that timestamp is older then ts, + // they are considered inactive and may be deleted. + // + // Subtracting a duration from ts will add a buffer for when series are + // considered inactive and safe for deletion. + ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime + if ts < 0 { + ts = 0 + } + + // Network issues can prevent the result of getRemoteWriteTimestamp from + // changing. We don't want data in the WAL to grow forever, so we set a cap + // on the maximum age data can be. If our ts is older than this cutoff point, + // we'll shift it forward to start deleting very stale data. + if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS { + ts = maxTS + } + + level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts) + if err := db.truncate(ts); err != nil { + level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err) + } + } + } +} + +func (db *DB) truncate(mint int64) error { + db.mtx.RLock() + defer db.mtx.RUnlock() + + start := time.Now() + + db.gc(mint) + level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start)) + + first, last, err := wal.Segments(db.wal.Dir()) + if err != nil { + return errors.Wrap(err, "get segment range") + } + + // Start a new segment so low ingestion volume instances don't have more WAL + // than needed. + err = db.wal.NextSegment() + if err != nil { + return errors.Wrap(err, "next segment") + } + + last-- // Never consider most recent segment for checkpoint + if last < 0 { + return nil // no segments yet + } + + // The lower two-thirds of segments should contain mostly obsolete samples. + // If we have less than two segments, it's not worth checkpointing yet. + last = first + (last-first)*2/3 + if last <= first { + return nil + } + + keep := func(id uint64) bool { + if db.series.GetByID(id) != nil { + return true + } + + seg, ok := db.deleted[id] + return ok && seg >= first + } + + db.metrics.checkpointCreationTotal.Inc() + + if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { + db.metrics.checkpointCreationFail.Inc() + if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { + db.metrics.walCorruptionsTotal.Inc() + } + return errors.Wrap(err, "create checkpoint") + } + if err := db.wal.Truncate(last + 1); err != nil { + // If truncating fails, we'll just try it again at the next checkpoint. + // Leftover segments will still just be ignored in the future if there's a + // checkpoint that supersedes them. + level.Error(db.logger).Log("msg", "truncating segments failed", "err", err) + } + + // The checkpoint is written and segments before it are truncated, so we + // no longer need to track deleted series that were being kept around. + for ref, segment := range db.deleted { + if segment < first { + delete(db.deleted, ref) + } + } + db.metrics.checkpointDeleteTotal.Inc() + db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) + + if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. They will just be ignored since a newer checkpoint + // exists. + level.Error(db.logger).Log("msg", "delete old checkpoints", "err", err) + db.metrics.checkpointDeleteFail.Inc() + } + + db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + + level.Info(db.logger).Log("msg", "WAL checkpoint complete", "first", first, "last", last, "duration", time.Since(start)) + return nil +} + +// gc marks ref IDs that have not received a sample since mint as deleted in +// s.deleted, along with the segment where they originally got deleted. +func (db *DB) gc(mint int64) { + deleted := db.series.GC(mint) + db.metrics.numActiveSeries.Sub(float64(len(deleted))) + + _, last, _ := wal.Segments(db.wal.Dir()) + + // We want to keep series records for any newly deleted series + // until we've passed the last recorded segment. This prevents + // the WAL having samples for series records that no longer exist. + for ref := range deleted { + db.deleted[ref] = last + } + + db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) +} + +// StartTime implements the Storage interface. +func (db *DB) StartTime() (int64, error) { + return int64(model.Latest), nil +} + +// Querier implements the Storage interface. +func (db *DB) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return nil, ErrUnsupported +} + +// ChunkQuerier implements the Storage interface. +func (db *DB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + return nil, ErrUnsupported +} + +// ExemplarQuerier implements the Storage interface. +func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + return nil, ErrUnsupported +} + +// Appender implements storage.Storage. +func (db *DB) Appender(_ context.Context) storage.Appender { + return db.appenderPool.Get().(storage.Appender) +} + +// Close implements the Storage interface. +func (db *DB) Close() error { + db.mtx.Lock() + defer db.mtx.Unlock() + + close(db.stopc) + <-db.donec + + db.metrics.Unregister() + + return db.wal.Close() +} + +type appender struct { + *DB + + pendingSeries []record.RefSeries + pendingSamples []record.RefSample +} + +func (a *appender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) { + if ref == 0 { + return a.Add(l, t, v) + } + return ref, a.AddFast(ref, t, v) +} + +func (a *appender) Add(l labels.Labels, t int64, v float64) (uint64, error) { + hash := l.Hash() + series := a.series.GetByHash(hash, l) + if series != nil { + return series.ref, a.AddFast(series.ref, t, v) + } + + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + ref := a.nextRef.Inc() + series = &memSeries{ref: ref, lset: l, lastTs: t} + + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: ref, + Labels: l, + }) + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: ref, + T: t, + V: v, + }) + + a.series.Set(hash, series) + + a.metrics.numActiveSeries.Inc() + a.metrics.totalAppendedSamples.Inc() + + return series.ref, nil +} + +func (a *appender) AddFast(ref uint64, t int64, v float64) error { + series := a.series.GetByID(ref) + if series == nil { + return storage.ErrNotFound + } + series.Lock() + defer series.Unlock() + + // Update last recorded timestamp. Used by Storage.gc to determine if a + // series is dead. + series.lastTs = t + + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: ref, + T: t, + V: v, + }) + + a.metrics.totalAppendedSamples.Inc() + return nil +} + +func (a *appender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // remote_write doesn't support exemplars yet, so do nothing here. + return 0, nil +} + +// Commit submits the collected samples and purges the batch. +func (a *appender) Commit() error { + a.mtx.RLock() + defer a.mtx.RUnlock() + + var encoder record.Encoder + buf := a.bufPool.Get().([]byte) + + if len(a.pendingSeries) > 0 { + buf = encoder.Series(a.pendingSeries, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.pendingSamples) > 0 { + buf = encoder.Samples(a.pendingSamples, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + //nolint:staticcheck + a.bufPool.Put(buf) + return a.Rollback() +} + +func (a *appender) Rollback() error { + a.pendingSeries = a.pendingSeries[:0] + a.pendingSamples = a.pendingSamples[:0] + a.appenderPool.Put(a) + return nil +} diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go new file mode 100644 index 000000000..e1a6106a3 --- /dev/null +++ b/tsdb/agent/db_test.go @@ -0,0 +1,449 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "io/ioutil" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/wal" + "github.com/stretchr/testify/require" +) + +func TestUnsupported(t *testing.T) { + promAgentDir, err := ioutil.TempDir("", "TestUnsupported") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + opts := DefaultOptions() + logger := log.NewNopLogger() + + s, err := Open(logger, prometheus.DefaultRegisterer, nil, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + defer s.Close() + + t.Run("Querier", func(t *testing.T) { + _, err := s.Querier(context.TODO(), 0, 0) + require.Equal(t, err, ErrUnsupported) + }) + + t.Run("ChunkQuerier", func(t *testing.T) { + _, err := s.ChunkQuerier(context.TODO(), 0, 0) + require.Equal(t, err, ErrUnsupported) + }) + + t.Run("ExemplarQuerier", func(t *testing.T) { + _, err := s.ExemplarQuerier(context.TODO()) + require.Equal(t, err, ErrUnsupported) + }) +} + +func TestCommit(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 8 + ) + + promAgentDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + lbls := labelsForTest(t.Name(), numSeries) + opts := DefaultOptions() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + a := s.Appender(context.TODO()) + + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + sample := tsdbutil.GenerateSamples(0, 1) + _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + } + + // Read records from WAL and check for expected count of series and samples. + walSeriesCount := 0 + walSamplesCount := 0 + + reg = prometheus.NewRegistry() + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err = Open(logger, nil, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + var dec record.Decoder + + if err == nil { + sr, err := wal.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + + r := wal.NewReader(sr) + seriesPool := sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool := sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, _ = dec.Series(rec, series) + walSeriesCount += len(series) + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, _ = dec.Samples(rec, samples) + walSamplesCount += len(samples) + default: + } + } + } + + // Retrieved series count from WAL should match the count of series been added to the WAL. + require.Equal(t, walSeriesCount, numSeries) + + // Retrieved samples count from WAL should match the count of samples been added to the WAL. + require.Equal(t, walSamplesCount, numSeries*numDatapoints) +} + +func TestRollback(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 8 + ) + + promAgentDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + lbls := labelsForTest(t.Name(), numSeries) + opts := DefaultOptions() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + a := s.Appender(context.TODO()) + + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + sample := tsdbutil.GenerateSamples(0, 1) + _, err := a.Append(0, lset, sample[0].T(), sample[0].V()) + require.NoError(t, err) + } + } + + require.NoError(t, a.Rollback()) + + // Read records from WAL and check for expected count of series and samples. + walSeriesCount := 0 + walSamplesCount := 0 + + reg = prometheus.NewRegistry() + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err = Open(logger, nil, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + var dec record.Decoder + + if err == nil { + sr, err := wal.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + + r := wal.NewReader(sr) + seriesPool := sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool := sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, _ = dec.Series(rec, series) + walSeriesCount += len(series) + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, _ = dec.Samples(rec, samples) + walSamplesCount += len(samples) + default: + } + } + } + + // Retrieved series count from WAL should be zero. + require.Equal(t, walSeriesCount, 0) + + // Retrieved samples count from WAL should be zero. + require.Equal(t, walSamplesCount, 0) +} + +func TestFullTruncateWAL(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 800 + lastTs = 500 + ) + + promAgentDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + lbls := labelsForTest(t.Name(), numSeries) + opts := DefaultOptions() + opts.TruncateFrequency = time.Minute * 2 + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + a := s.Appender(context.TODO()) + + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + _, err := a.Append(0, lset, int64(lastTs), 0) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + } + + // Truncate WAL with mint to GC all the samples. + s.truncate(lastTs + 1) + + m := gatherFamily(t, reg, "prometheus_agent_deleted_series") + require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") +} + +func TestPartialTruncateWAL(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 800 + ) + + promAgentDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + opts := DefaultOptions() + opts.TruncateFrequency = time.Minute * 2 + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + a := s.Appender(context.TODO()) + + var lastTs int64 + + // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. + lastTs = 500 + lbls := labelsForTest(t.Name()+"batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + _, err := a.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + } + + // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. + lastTs = 600 + + lbls = labelsForTest(t.Name()+"batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + _, err := a.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + } + + // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. + s.truncate(lastTs - 1) + + m := gatherFamily(t, reg, "prometheus_agent_deleted_series") + require.Equal(t, m.Metric[0].Gauge.GetValue(), float64(numSeries), "agent wal truncate mismatch of deleted series count") +} + +func TestWALReplay(t *testing.T) { + const ( + numDatapoints = 1000 + numSeries = 8 + lastTs = 500 + ) + + promAgentDir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(promAgentDir)) + }) + + lbls := labelsForTest(t.Name(), numSeries) + opts := DefaultOptions() + + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil) + + s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + a := s.Appender(context.TODO()) + + for _, l := range lbls { + lset := labels.New(l...) + + for i := 0; i < numDatapoints; i++ { + _, err := a.Append(0, lset, lastTs, 0) + require.NoError(t, err) + } + } + + require.NoError(t, a.Commit()) + + restartOpts := DefaultOptions() + restartLogger := log.NewNopLogger() + restartReg := prometheus.NewRegistry() + + s, err = Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) + if err != nil { + t.Fatalf("unable to create storage for the agent: %v", err) + } + + // Check if all the series are retrieved back from the WAL. + m := gatherFamily(t, restartReg, "prometheus_agent_active_series") + require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") + + // Check if lastTs of the samples retrieved from the WAL is retained. + metrics := s.series.series + for i := 0; i < len(metrics); i++ { + mp := metrics[i] + for _, v := range mp { + require.Equal(t, v.lastTs, int64(lastTs)) + } + } +} + +func startTime() (int64, error) { + return time.Now().Unix() * 1000, nil +} + +// Create series for tests. +func labelsForTest(lName string, seriesCount int) []labels.Labels { + var series []labels.Labels + + for i := 0; i < seriesCount; i++ { + lset := labels.Labels{ + {Name: "a", Value: lName}, + {Name: "job", Value: "prometheus"}, + {Name: "instance", Value: "localhost" + strconv.Itoa(i)}, + } + series = append(series, lset) + } + + return series +} + +func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto.MetricFamily { + t.Helper() + + families, err := reg.Gather() + require.NoError(t, err, "failed to gather metrics") + + for _, f := range families { + if f.GetName() == familyName { + return f + } + } + + t.Fatalf("could not find family %s", familyName) + + return nil +} diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go new file mode 100644 index 000000000..71b3ca2e2 --- /dev/null +++ b/tsdb/agent/series.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "sync" + + "github.com/prometheus/prometheus/pkg/labels" +) + +// memSeries is a chunkless version of tsdb.memSeries. +type memSeries struct { + sync.Mutex + + ref uint64 + lset labels.Labels + lastTs int64 +} + +// seriesHashmap is a simple hashmap for memSeries by their label set. +// It is built on top of a regular hashmap and holds a slice of series to +// resolve hash collisions. Its methods require the hash to be submitted +// with the label set to avoid re-computing hash throughout the code. +type seriesHashmap map[uint64][]*memSeries + +func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries { + for _, s := range m[hash] { + if labels.Equal(s.lset, lset) { + return s + } + } + return nil +} + +func (m seriesHashmap) Set(hash uint64, s *memSeries) { + seriesSet := m[hash] + for i, prev := range seriesSet { + if labels.Equal(prev.lset, s.lset) { + seriesSet[i] = s + return + } + } + m[hash] = append(seriesSet, s) +} + +func (m seriesHashmap) Delete(hash uint64, ref uint64) { + var rem []*memSeries + for _, s := range m[hash] { + if s.ref != ref { + rem = append(rem, s) + } + } + if len(rem) == 0 { + delete(m, hash) + } else { + m[hash] = rem + } +} + +// stripeSeries locks modulo ranges of IDs and hashes to reduce lock +// contention. The locks are padded to not be on the same cache line. +// Filling the padded space with the maps was profiled to be slower - +// likely due to the additional pointer dereferences. +type stripeSeries struct { + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + locks []stripeLock +} + +type stripeLock struct { + sync.RWMutex + // Padding to avoid multiple locks being on the same cache line. + _ [40]byte +} + +func newStripeSeries(stripeSize int) *stripeSeries { + s := &stripeSeries{ + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + locks: make([]stripeLock, stripeSize), + } + for i := range s.series { + s.series[i] = map[uint64]*memSeries{} + } + for i := range s.hashes { + s.hashes[i] = seriesHashmap{} + } + return s +} + +// GC garbage collects old series that have not received a sample after mint +// and will fully delete them. +func (s *stripeSeries) GC(mint int64) map[uint64]struct{} { + deleted := map[uint64]struct{}{} + + for hashLock := 0; hashLock < s.size; hashLock++ { + s.locks[hashLock].Lock() + + for hash, all := range s.hashes[hashLock] { + for _, series := range all { + series.Lock() + + // Any series that has received a write since mint is still alive. + if series.lastTs >= mint { + series.Unlock() + continue + } + + // The series is stale. We need to obtain a second lock for the + // ref if it's different than the hash lock. + refLock := int(series.ref) & (s.size - 1) + if hashLock != refLock { + s.locks[refLock].Lock() + } + + deleted[series.ref] = struct{}{} + delete(s.series[refLock], series.ref) + s.hashes[hashLock].Delete(hash, series.ref) + + if hashLock != refLock { + s.locks[refLock].Unlock() + } + series.Unlock() + } + } + + s.locks[hashLock].Unlock() + } + + return deleted +} + +func (s *stripeSeries) GetByID(id uint64) *memSeries { + refLock := id & uint64(s.size-1) + + s.locks[refLock].RLock() + defer s.locks[refLock].RUnlock() + return s.series[refLock][id] +} + +func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries { + hashLock := hash & uint64(s.size-1) + + s.locks[hashLock].RLock() + defer s.locks[hashLock].RUnlock() + return s.hashes[hashLock].Get(hash, lset) +} + +func (s *stripeSeries) Set(hash uint64, series *memSeries) { + var ( + hashLock = hash & uint64(s.size-1) + refLock = series.ref & uint64(s.size-1) + ) + s.locks[hashLock].Lock() + defer s.locks[hashLock].Unlock() + + if hashLock != refLock { + s.locks[refLock].Lock() + defer s.locks[refLock].Unlock() + } + + s.hashes[hashLock].Set(hash, series) + s.series[refLock][series.ref] = series +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 26ce55c8b..4d85a5688 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -181,6 +181,7 @@ type API struct { buildInfo *PrometheusVersion runtimeInfo func() (RuntimeInfo, error) gatherer prometheus.Gatherer + isAgent bool remoteWriteHandler http.Handler remoteReadHandler http.Handler @@ -211,6 +212,7 @@ func NewAPI( remoteReadSampleLimit int, remoteReadConcurrencyLimit int, remoteReadMaxBytesInFrame int, + isAgent bool, CORSOrigin *regexp.Regexp, runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, @@ -239,6 +241,7 @@ func NewAPI( runtimeInfo: runtimeInfo, buildInfo: buildInfo, gatherer: gatherer, + isAgent: isAgent, remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame), } @@ -282,26 +285,35 @@ func (api *API) Register(r *route.Router) { }.ServeHTTP) } + wrapAgent := func(f apiFunc) http.HandlerFunc { + return wrap(func(r *http.Request) apiFuncResult { + if api.isAgent { + return apiFuncResult{nil, &apiError{errorExec, errors.New("unavailable with Prometheus Agent")}, nil, nil} + } + return f(r) + }) + } + r.Options("/*path", wrap(api.options)) - r.Get("/query", wrap(api.query)) - r.Post("/query", wrap(api.query)) - r.Get("/query_range", wrap(api.queryRange)) - r.Post("/query_range", wrap(api.queryRange)) - r.Get("/query_exemplars", wrap(api.queryExemplars)) - r.Post("/query_exemplars", wrap(api.queryExemplars)) + r.Get("/query", wrapAgent(api.query)) + r.Post("/query", wrapAgent(api.query)) + r.Get("/query_range", wrapAgent(api.queryRange)) + r.Post("/query_range", wrapAgent(api.queryRange)) + r.Get("/query_exemplars", wrapAgent(api.queryExemplars)) + r.Post("/query_exemplars", wrapAgent(api.queryExemplars)) - r.Get("/labels", wrap(api.labelNames)) - r.Post("/labels", wrap(api.labelNames)) - r.Get("/label/:name/values", wrap(api.labelValues)) + r.Get("/labels", wrapAgent(api.labelNames)) + r.Post("/labels", wrapAgent(api.labelNames)) + r.Get("/label/:name/values", wrapAgent(api.labelValues)) - r.Get("/series", wrap(api.series)) - r.Post("/series", wrap(api.series)) - r.Del("/series", wrap(api.dropSeries)) + r.Get("/series", wrapAgent(api.series)) + r.Post("/series", wrapAgent(api.series)) + r.Del("/series", wrapAgent(api.dropSeries)) r.Get("/targets", wrap(api.targets)) r.Get("/targets/metadata", wrap(api.targetMetadata)) - r.Get("/alertmanagers", wrap(api.alertmanagers)) + r.Get("/alertmanagers", wrapAgent(api.alertmanagers)) r.Get("/metadata", wrap(api.metricMetadata)) @@ -309,22 +321,22 @@ func (api *API) Register(r *route.Router) { r.Get("/status/runtimeinfo", wrap(api.serveRuntimeInfo)) r.Get("/status/buildinfo", wrap(api.serveBuildInfo)) r.Get("/status/flags", wrap(api.serveFlags)) - r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) + r.Get("/status/tsdb", wrapAgent(api.serveTSDBStatus)) r.Get("/status/walreplay", api.serveWALReplayStatus) r.Post("/read", api.ready(api.remoteRead)) r.Post("/write", api.ready(api.remoteWrite)) - r.Get("/alerts", wrap(api.alerts)) - r.Get("/rules", wrap(api.rules)) + r.Get("/alerts", wrapAgent(api.alerts)) + r.Get("/rules", wrapAgent(api.rules)) // Admin APIs - r.Post("/admin/tsdb/delete_series", wrap(api.deleteSeries)) - r.Post("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones)) - r.Post("/admin/tsdb/snapshot", wrap(api.snapshot)) + r.Post("/admin/tsdb/delete_series", wrapAgent(api.deleteSeries)) + r.Post("/admin/tsdb/clean_tombstones", wrapAgent(api.cleanTombstones)) + r.Post("/admin/tsdb/snapshot", wrapAgent(api.snapshot)) - r.Put("/admin/tsdb/delete_series", wrap(api.deleteSeries)) - r.Put("/admin/tsdb/clean_tombstones", wrap(api.cleanTombstones)) - r.Put("/admin/tsdb/snapshot", wrap(api.snapshot)) + r.Put("/admin/tsdb/delete_series", wrapAgent(api.deleteSeries)) + r.Put("/admin/tsdb/clean_tombstones", wrapAgent(api.cleanTombstones)) + r.Put("/admin/tsdb/snapshot", wrapAgent(api.snapshot)) } type queryData struct { diff --git a/web/web.go b/web/web.go index db0069fb6..269abd0bf 100644 --- a/web/web.go +++ b/web/web.go @@ -248,6 +248,7 @@ type Options struct { RemoteReadConcurrencyLimit int RemoteReadBytesInFrame int RemoteWriteReceiver bool + IsAgent bool Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -328,6 +329,7 @@ func New(logger log.Logger, o *Options) *Handler { h.options.RemoteReadSampleLimit, h.options.RemoteReadConcurrencyLimit, h.options.RemoteReadBytesInFrame, + h.options.IsAgent, h.options.CORSOrigin, h.runtimeInfo, h.versionInfo, diff --git a/web/web_test.go b/web/web_test.go index 399e55266..868fd08e6 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -526,6 +526,68 @@ func TestHandleMultipleQuitRequests(t *testing.T) { } } +// Test for availability of API endpoints in Prometheus Agent mode. +func TestAgentAPIEndPoints(t *testing.T) { + t.Parallel() + + opts := &Options{ + ListenAddress: ":9090", + ReadTimeout: 30 * time.Second, + MaxConnections: 512, + Context: nil, + Storage: nil, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", + EnableAdminAPI: true, + ExternalURL: &url.URL{ + Scheme: "http", + Host: "localhost:9090", + Path: "/", + }, + Version: &PrometheusVersion{}, + Gatherer: prometheus.DefaultGatherer, + IsAgent: true, + } + + opts.Flags = map[string]string{} + + webHandler := New(nil, opts) + webHandler.Ready() + + // Test for non-available endpoints in the Agent mode. + for _, u := range []string{ + "http://localhost:9090/-/labels", + "http://localhost:9090/label", + "http://localhost:9090/series", + "http://localhost:9090/alertmanagers", + "http://localhost:9090/query", + "http://localhost:9090/query_range", + "http://localhost:9090/query_exemplars", + } { + w := httptest.NewRecorder() + req, err := http.NewRequest("GET", u, nil) + require.NoError(t, err) + webHandler.router.ServeHTTP(w, req) + require.Equal(t, http.StatusNotFound, w.Code) + } + + // Test for available endpoints in the Agent mode. + for _, u := range []string{ + "http://localhost:9090/targets", + "http://localhost:9090/status", + } { + w := httptest.NewRecorder() + req, err := http.NewRequest("GET", u, nil) + require.NoError(t, err) + webHandler.router.ServeHTTP(w, req) + fmt.Println(u) + require.Equal(t, http.StatusOK, w.Code) + } +} + func cleanupTestResponse(t *testing.T, resp *http.Response) { _, err := io.Copy(ioutil.Discard, resp.Body) require.NoError(t, err)