diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 2eed8e7ce..c042143ff 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -193,7 +193,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { c.scrape.ExtraMetrics = true level.Info(logger).Log("msg", "Experimental additional scrape metrics enabled") case "metadata-wal-records": - c.scrape.EnableMetadataStorage = true + c.scrape.AppendMetadata = true level.Info(logger).Log("msg", "Experimental metadata records in WAL enabled, required for remote write 2.0") case "new-service-discovery-manager": c.enableNewSDManager = true @@ -634,7 +634,7 @@ func main() { var ( localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat), cfg.scrape.EnableMetadataStorage) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) @@ -819,7 +819,7 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } - cfg.web.RemoteWriteFormat = remote.RemoteWriteFormat(cfg.rwFormat) + cfg.web.RemoteWriteFormat = config.RemoteWriteFormat(cfg.rwFormat) // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager. webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) diff --git a/config/config.go b/config/config.go index 7fa03a144..82dd29d65 100644 --- a/config/config.go +++ b/config/config.go @@ -1016,6 +1016,9 @@ func CheckTargetAddress(address model.LabelValue) error { return nil } +// This needs to live here rather than in the remote package to avoid an import cycle. +type RemoteWriteFormat int64 + // RemoteWriteConfig is the configuration for writing to remote storage. type RemoteWriteConfig struct { URL *config.URL `yaml:"url"` @@ -1025,6 +1028,7 @@ type RemoteWriteConfig struct { Name string `yaml:"name,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` + ProtocolVersion RemoteWriteFormat `yaml:"remote_write_version,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/scrape/manager.go b/scrape/manager.go index 3ad315a50..446bc3dc0 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -73,9 +73,11 @@ type Options struct { // Option used by downstream scraper users like OpenTelemetry Collector // to help lookup metric metadata. Should be false for Prometheus. PassMetadataInContext bool - // Option to enable the experimental in-memory metadata storage and append - // metadata to the WAL. - EnableMetadataStorage bool + // Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders + // can decide what to do with metadata, but for practical purposes this flag exists so that metadata + // can be written to the WAL and thus read for remote write. + // TODO: implement some form of metadata storage + AppendMetadata bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration // Option to enable the ingestion of the created timestamp as a synthetic zero sample. diff --git a/scrape/scrape.go b/scrape/scrape.go index dfa945852..bde5249d0 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -173,7 +173,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.scrapeClassicHistograms, options.EnableCreatedTimestampZeroIngestion, options.ExtraMetrics, - options.EnableMetadataStorage, + options.AppendMetadata, opts.target, options.PassMetadataInContext, metrics, diff --git a/storage/remote/client.go b/storage/remote/client.go index 19f409b11..ddceac3b2 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -32,10 +32,12 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/sigv4" "github.com/prometheus/common/version" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote/azuread" ) @@ -81,9 +83,9 @@ func init() { // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - remoteName string // Used to differentiate clients in metrics. - urlString string // url.String() - rwFormat RemoteWriteFormat // For write clients, ignored for read clients. + remoteName string // Used to differentiate clients in metrics. + urlString string // url.String() + rwFormat config.RemoteWriteFormat // For write clients, ignored for read clients. Client *http.Client timeout time.Duration @@ -97,7 +99,7 @@ type Client struct { // ClientConfig configures a client. type ClientConfig struct { URL *config_util.URL - RemoteWriteFormat RemoteWriteFormat + RemoteWriteFormat config.RemoteWriteFormat Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig SigV4Config *sigv4.SigV4Config @@ -215,7 +217,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) } else { // Set the right header if we're using v1.1 remote write protocol - httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) + httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue) } if attempt > 0 { diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index abd726821..750fb612c 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -568,8 +568,7 @@ func TestDecodeWriteRequest(t *testing.T) { } func TestDecodeMinWriteRequest(t *testing.T) { - buf, _, _, err := buildMinimizedWriteRequestStr(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) - + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) require.NoError(t, err) actual, err := DecodeMinimizedWriteRequestStr(bytes.NewReader(buf)) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 980dcfbab..4dc2b834e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -394,11 +394,9 @@ type WriteClient interface { Endpoint() string } -type RemoteWriteFormat int64 //nolint:revive // exported. - const ( - Version1 RemoteWriteFormat = iota // 1.0, 0.1, etc. - Version2 // symbols are indices into an array of strings + Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc. + Version2 // symbols are indices into an array of strings ) // QueueManager manages a queue of samples to be sent to the Storage @@ -419,7 +417,7 @@ type QueueManager struct { watcher *wlog.Watcher metadataWatcher *MetadataWatcher // experimental feature, new remote write proto format - rwFormat RemoteWriteFormat + rwFormat config.RemoteWriteFormat clientMtx sync.RWMutex storeClient WriteClient @@ -468,7 +466,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, - rwFormat RemoteWriteFormat, + rwFormat config.RemoteWriteFormat, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -1659,16 +1657,15 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen return nPendingSamples, nPendingExemplars, nPendingHistograms } -func (s *shards) sendSamples(ctx context.Context, series []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { begin := time.Now() - err := s.sendSamplesWithBackoff(ctx, series, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf) - + err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin)) } func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) { begin := time.Now() - err := s.sendV2SamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, metadataCount, labels, pBuf, buf) + err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin)) } @@ -1786,9 +1783,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } // sendV2Samples to the remote storage with backoff for recoverable errors. -func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, labels []string, pBuf, buf *[]byte) error { +func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) error { // Build the WriteRequest with no metadata. - req, highest, lowest, err := buildMinimizedWriteRequestStr(s.qm.logger, samples, labels, pBuf, buf, nil) + req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil) s.qm.buildRequestLimitTimestamp.Store(lowest) if err != nil { // Failing to build the write request is non-recoverable, since it will @@ -1807,7 +1804,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 lowest := s.qm.buildRequestLimitTimestamp.Load() if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) { // This will filter out old samples during retries. - req, _, lowest, err := buildMinimizedWriteRequestStr( + req, _, lowest, err := buildV2WriteRequest( s.qm.logger, samples, labels, @@ -2102,9 +2099,8 @@ func (r *rwSymbolTable) clear() { } } -func buildMinimizedWriteRequestStr(logger log.Logger, timeSeries []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool) ([]byte, int64, int64, error) { - highest, lowest, timeSeries, - droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(timeSeries, filter) +func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool) ([]byte, int64, int64, error) { + highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 6c879d7c5..67c09a39d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -68,7 +68,7 @@ func TestSampleDelivery(t *testing.T) { exemplars bool histograms bool floatHistograms bool - rwFormat RemoteWriteFormat + rwFormat config.RemoteWriteFormat }{ {samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, {samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, @@ -108,7 +108,7 @@ func TestSampleDelivery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat, true) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true) defer s.Close() var ( @@ -138,6 +138,8 @@ func TestSampleDelivery(t *testing.T) { // Apply new config. queueConfig.Capacity = len(samples) queueConfig.MaxSamplesPerSend = len(samples) / 2 + // For now we only ever have a single rw config in this test. + conf.RemoteWriteConfigs[0].ProtocolVersion = tc.rwFormat require.NoError(t, s.ApplyConfig(conf)) hash, err := toHash(writeConfig) require.NoError(t, err) @@ -385,7 +387,7 @@ func TestMetadataDelivery(t *testing.T) { func TestWALMetadataDelivery(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version2, true) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true) defer s.Close() cfg := config.DefaultQueueConfig @@ -394,6 +396,7 @@ func TestWALMetadataDelivery(t *testing.T) { writeConfig := baseRemoteWriteConfig("http://test-storage.com") writeConfig.QueueConfig = cfg + writeConfig.ProtocolVersion = Version2 conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -424,7 +427,7 @@ func TestWALMetadataDelivery(t *testing.T) { } func TestSampleDeliveryTimeout(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 @@ -456,7 +459,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { } func TestSampleDeliveryOrder(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts @@ -558,7 +561,7 @@ func TestSeriesReset(t *testing.T) { } func TestReshard(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { size := 10 // Make bigger to find more races. nSeries := 6 @@ -601,7 +604,7 @@ func TestReshard(t *testing.T) { } func TestReshardRaceWithStop(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { c := NewTestWriteClient(rwFormat) var m *QueueManager @@ -639,7 +642,7 @@ func TestReshardRaceWithStop(t *testing.T) { } func TestReshardPartialBatch(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { samples, series := createTimeseries(1, 10) @@ -685,7 +688,7 @@ func TestReshardPartialBatch(t *testing.T) { // where a large scrape (> capacity + max samples per send) is appended at the // same time as a batch times out according to the batch send deadline. func TestQueueFilledDeadlock(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { samples, series := createTimeseries(50, 1) @@ -727,7 +730,7 @@ func TestQueueFilledDeadlock(t *testing.T) { } func TestReleaseNoninternedString(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} { + for _, rwFormat := range []config.RemoteWriteFormat{Version1, Version2} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig @@ -964,10 +967,10 @@ type TestWriteClient struct { writesReceived int mtx sync.Mutex buf []byte - rwFormat RemoteWriteFormat + rwFormat config.RemoteWriteFormat } -func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient { +func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient { return &TestWriteClient{ receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, @@ -1772,7 +1775,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { // Warmup buffers for i := 0; i < 10; i++ { populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) - buildMinimizedWriteRequestStr(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) + buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) } b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { @@ -1780,7 +1783,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { for j := 0; j < b.N; j++ { populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) b.ResetTimer() - req, _, _, err := buildMinimizedWriteRequestStr(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) + req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil) if err != nil { b.Fatal(err) } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index ae75231d5..fe823288c 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -92,7 +92,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { // todo: test with new format type(s)? - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index fa59f57bc..afa2d411a 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -62,7 +62,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, metadataInWAL bool) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage { if l == nil { l = log.NewNopLogger() } @@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal logger: logger, localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat, metadataInWAL) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 1fa0c4b8d..8b44eb140 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -30,7 +30,7 @@ func TestStorageLifecycle(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -58,7 +58,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -80,7 +80,7 @@ func TestFilterExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -106,7 +106,7 @@ func TestIgnoreExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -158,7 +158,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig { // ApplyConfig runs concurrently with Notify // See https://github.com/prometheus/prometheus/issues/12747 func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) { - s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, Version1, false) + s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false) var wg sync.WaitGroup wg.Add(2000) diff --git a/storage/remote/write.go b/storage/remote/write.go index afb7dbd69..8d584530f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -66,7 +66,6 @@ type WriteStorage struct { externalLabels labels.Labels dir string queues map[string]*QueueManager - rwFormat RemoteWriteFormat metadataInWAL bool samplesIn *ewmaRate flushDeadline time.Duration @@ -79,13 +78,12 @@ type WriteStorage struct { } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, metadataInWal bool) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } rws := &WriteStorage{ queues: make(map[string]*QueueManager), - rwFormat: rwFormat, watcherMetrics: wlog.NewWatcherMetrics(reg), liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), logger: logger, @@ -96,6 +94,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f interner: newPool(), scraper: sm, quit: make(chan struct{}), + metadataInWAL: metadataInWal, highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -149,8 +148,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { newQueues := make(map[string]*QueueManager) newHashes := []string{} for _, rwConf := range conf.RemoteWriteConfigs { - // todo: change the rws.rwFormat to a queue config field - if rws.rwFormat > Version1 && rws.metadataInWAL { + if rwConf.ProtocolVersion > Version1 && !rws.metadataInWAL { return errors.New("invalid remote write configuration, if you are using remote write version 2.0 then the feature flag for metadata records in the WAL must be enabled") } hash, err := toHash(rwConf) @@ -173,7 +171,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, err := NewWriteClient(name, &ClientConfig{ URL: rwConf.URL, - RemoteWriteFormat: rws.rwFormat, + RemoteWriteFormat: rwConf.ProtocolVersion, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, SigV4Config: rwConf.SigV4Config, @@ -216,7 +214,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, - rws.rwFormat, + rwConf.ProtocolVersion, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index f1c479984..0ebeda806 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -19,24 +19,24 @@ import ( "fmt" "net/http" - "github.com/prometheus/prometheus/model/labels" - writev2 "github.com/prometheus/prometheus/prompb/write/v2" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/write/v2" "github.com/prometheus/prometheus/storage" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" ) const ( - RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" - RemoteWriteVersion1HeaderValue = "0.1.0" - RemoteWriteVersion11HeaderValue = "1.1" // TODO-RW11: Final value? + RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" + RemoteWriteVersion1HeaderValue = "0.1.0" + RemoteWriteVersion2HeaderValue = "2.0" ) type writeHandler struct { @@ -47,13 +47,13 @@ type writeHandler struct { // Experimental feature, new remote write proto format // The handler will accept the new format, but it can still accept the old one - // TODO: this should eventually be via content negotiation - rwFormat RemoteWriteFormat + // TODO: this should eventually be via content negotiation? + rwFormat config.RemoteWriteFormat } // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. -func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat RemoteWriteFormat) http.Handler { +func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat config.RemoteWriteFormat) http.Handler { h := &writeHandler{ logger: logger, appendable: appendable, diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index dc99ecf62..dad2663bb 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -85,11 +85,11 @@ func TestRemoteWriteHandler(t *testing.T) { } func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { - buf, _, _, err := buildMinimizedWriteRequestStr(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) + buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) + req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue) require.NoError(t, err) appendable := &mockAppendable{} diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 7aa45af11..62a3d087f 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -118,7 +118,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) { for _, tc := range cases { // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Version1, false) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -141,7 +141,7 @@ func TestRestartOnNameChange(t *testing.T) { require.NoError(t, err) // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Version1, false) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Version1, false) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -208,7 +208,7 @@ func TestWriteStorageLifecycle(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -226,7 +226,7 @@ func TestUpdateExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Version1, false) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -256,7 +256,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -282,7 +282,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 3e6ee3eef..b82e9cd37 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -88,7 +88,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) * t.Helper() dbDir := t.TempDir() - rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, remote.Version1, false) + rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -584,7 +584,7 @@ func TestLockfile(t *testing.T) { tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() - rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, remote.Version1, false) + rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false) t.Cleanup(func() { require.NoError(t, rs.Close()) }) @@ -604,7 +604,7 @@ func TestLockfile(t *testing.T) { func Test_ExistingWAL_NextRef(t *testing.T) { dbDir := t.TempDir() - rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, remote.Version1, false) + rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) defer func() { require.NoError(t, rs.Close()) }() diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 85534d52e..4d71ed93d 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -253,7 +253,7 @@ func NewAPI( registerer prometheus.Registerer, statsRenderer StatsRenderer, rwEnabled bool, - rwFormat remote.RemoteWriteFormat, + rwFormat config.RemoteWriteFormat, otlpEnabled bool, ) *API { a := &API{ diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 20b53df4d..c6bd2ee62 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -462,7 +462,7 @@ func TestEndpoints(t *testing.T) { // TODO: test with other proto format(s)? remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { return 0, nil - }, dbDir, 1*time.Second, nil, remote.Version1, false) + }, dbDir, 1*time.Second, nil, false) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{ diff --git a/web/web.go b/web/web.go index 69cd67d38..5e9959850 100644 --- a/web/web.go +++ b/web/web.go @@ -57,7 +57,6 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/util/httputil" api_v1 "github.com/prometheus/prometheus/web/api/v1" @@ -262,7 +261,8 @@ type Options struct { EnableOTLPWriteReceiver bool IsAgent bool AppName string - RemoteWriteFormat remote.RemoteWriteFormat + // TODO(cstyan): should this change to a list of tuples, maybe via the content negotiation PR? + RemoteWriteFormat config.RemoteWriteFormat Gatherer prometheus.Gatherer Registerer prometheus.Registerer