fix NewWriteClient and change new flags wording

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-10-23 12:13:33 -03:00
parent 7dc2002ca9
commit ba3422df1f
6 changed files with 81 additions and 81 deletions

View file

@ -146,7 +146,6 @@ type flagConfig struct {
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
rwProto bool
featureList []string
// These options are extracted from featureList
@ -155,6 +154,7 @@ type flagConfig struct {
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableSenderRemoteWrite11 bool
prometheusURL string
corsRegexString string
@ -220,12 +220,12 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
continue
case "promql-at-modifier", "promql-negative-offset":
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
case "reduced-rw-proto":
c.rwProto = true
level.Info(logger).Log("msg", "Reduced remote write proto format will be used, remote write receiver must be able to parse this new protobuf format.")
case "reduced-rw-proto-receiver":
c.web.EnableReducedWriteProtoReceiver = true
level.Info(logger).Log("msg", "Reduced proto format will be expected by the remote write receiver, client must send this new protobuf format.")
case "rw-1-1-sender":
c.enableSenderRemoteWrite11 = true
level.Info(logger).Log("msg", "Experimental remote write 1.1 will be used on the sender end, receiver must be able to parse this new protobuf format.")
case "rw-1-1-receiver":
c.web.EnableReceiverRemoteWrite11 = true
level.Info(logger).Log("msg", "Experimental remote write 1.1 will be supported on the receiver end, receiver can send this new protobuf format.")
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
@ -611,7 +611,7 @@ func main() {
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.rwProto)
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.enableSenderRemoteWrite11)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -83,7 +83,7 @@ func init() {
type Client struct {
remoteName string // Used to differentiate clients in metrics.
urlString string // url.String()
remotewrite11 bool
remotewrite11 bool // For write clients, ignored for read clients.
Client *http.Client
timeout time.Duration
@ -128,7 +128,6 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
return &Client{
remoteName: name,
urlString: conf.URL.String(),
remotewrite11: conf.RemoteWrite11,
Client: httpClient,
timeout: time.Duration(conf.Timeout),
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
@ -166,6 +165,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
httpClient.Transport = otelhttp.NewTransport(t)
return &Client{
remotewrite11: conf.RemoteWrite11,
remoteName: name,
urlString: conf.URL.String(),
Client: httpClient,

View file

@ -67,7 +67,7 @@ func TestSampleDelivery(t *testing.T) {
exemplars bool
histograms bool
floatHistograms bool
internProto bool
remoteWrite11 bool
}{
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
@ -75,11 +75,11 @@ func TestSampleDelivery(t *testing.T) {
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{internProto: true, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{internProto: true, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{internProto: true, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{internProto: true, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{internProto: true, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
{remoteWrite11: true, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{remoteWrite11: true, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{remoteWrite11: true, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{remoteWrite11: true, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{remoteWrite11: true, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
}
// Let's create an even number of send batches so we don't run into the
@ -106,7 +106,7 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.internProto)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.remoteWrite11)
defer s.Close()
var (
@ -139,7 +139,7 @@ func TestSampleDelivery(t *testing.T) {
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient(tc.internProto)
c := NewTestWriteClient(tc.remoteWrite11)
qm.SetClient(c)
qm.StoreSeries(series, 0)
@ -204,13 +204,13 @@ func TestMetadataDelivery(t *testing.T) {
}
func TestSampleDeliveryTimeout(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
c := NewTestWriteClient(internProto)
c := NewTestWriteClient(remoteWrite11)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
@ -220,7 +220,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -238,9 +238,9 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}
func TestSampleDeliveryOrder(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make([]record.RefSample, 0, n)
@ -258,7 +258,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
})
}
c := NewTestWriteClient(internProto)
c := NewTestWriteClient(remoteWrite11)
c.expectSamples(samples, series)
dir := t.TempDir()
@ -267,7 +267,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.StoreSeries(series, 0)
m.Start()
@ -341,15 +341,15 @@ func TestSeriesReset(t *testing.T) {
}
func TestReshard(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
size := 10 // Make bigger to find more races.
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
c := NewTestWriteClient(internProto)
c := NewTestWriteClient(remoteWrite11)
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
@ -359,7 +359,7 @@ func TestReshard(t *testing.T) {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.StoreSeries(series, 0)
m.Start()
@ -385,10 +385,10 @@ func TestReshard(t *testing.T) {
}
func TestReshardRaceWithStop(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
c := NewTestWriteClient(internProto)
remoteWrite11 := proto == "1.1"
c := NewTestWriteClient(remoteWrite11)
var m *QueueManager
h := sync.Mutex{}
@ -400,7 +400,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.Start()
h.Unlock()
h.Lock()
@ -425,9 +425,9 @@ func TestReshardRaceWithStop(t *testing.T) {
}
func TestReshardPartialBatch(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
samples, series := createTimeseries(1, 10)
c := NewTestBlockedWriteClient()
@ -440,7 +440,7 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.StoreSeries(series, 0)
m.Start()
@ -472,9 +472,9 @@ func TestReshardPartialBatch(t *testing.T) {
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
samples, series := createTimeseries(50, 1)
c := NewNopWriteClient()
@ -490,7 +490,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -515,14 +515,14 @@ func TestQueueFilledDeadlock(t *testing.T) {
}
func TestReleaseNoninternedString(t *testing.T) {
for _, proto := range []string{"interned", "non-interned"} {
for _, proto := range []string{"1.1", "1.0"} {
t.Run(proto, func(t *testing.T) {
internProto := proto == "interned"
remoteWrite11 := proto == "1.1"
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient(internProto)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, internProto)
c := NewTestWriteClient(remoteWrite11)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, remoteWrite11)
m.Start()
defer m.Stop()
@ -738,16 +738,16 @@ type TestWriteClient struct {
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
expectInternProto bool
expectRemoteWrite11 bool
}
func NewTestWriteClient(expectIntern bool) *TestWriteClient {
func NewTestWriteClient(expectRemoteWrite11 bool) *TestWriteClient {
return &TestWriteClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
expectInternProto: expectIntern,
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
expectRemoteWrite11: expectRemoteWrite11,
}
}
@ -863,7 +863,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
}
var reqProto *prompb.WriteRequest
if c.expectInternProto {
if c.expectRemoteWrite11 {
var reqReduced prompb.WriteRequestWithRefs
err = proto.Unmarshal(reqBuf, &reqReduced)
if err == nil {

View file

@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, writeReducedProto)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, remoteWrite11)
return s
}

View file

@ -65,7 +65,7 @@ type WriteStorage struct {
externalLabels labels.Labels
dir string
queues map[string]*QueueManager
writeReducedProto bool
remoteWrite11 bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
@ -77,13 +77,13 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, writeReducedProto bool) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, remoteWrite11 bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
rws := &WriteStorage{
queues: make(map[string]*QueueManager),
writeReducedProto: writeReducedProto,
remoteWrite11: remoteWrite11,
watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger,
@ -167,7 +167,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL,
RemoteWrite11: rws.writeReducedProto,
RemoteWrite11: rws.remoteWrite11,
Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig,
SigV4Config: rwConf.SigV4Config,
@ -211,7 +211,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.scraper,
rwConf.SendExemplars,
rwConf.SendNativeHistograms,
rws.writeReducedProto,
rws.remoteWrite11,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

View file

@ -241,27 +241,27 @@ type Options struct {
Version *PrometheusVersion
Flags map[string]string
ListenAddress string
CORSOrigin *regexp.Regexp
ReadTimeout time.Duration
MaxConnections int
ExternalURL *url.URL
RoutePrefix string
UseLocalAssets bool
UserAssetsPath string
ConsoleTemplatesPath string
ConsoleLibrariesPath string
EnableLifecycle bool
EnableAdminAPI bool
PageTitle string
RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool
IsAgent bool
AppName string
EnableReducedWriteProtoReceiver bool
ListenAddress string
CORSOrigin *regexp.Regexp
ReadTimeout time.Duration
MaxConnections int
ExternalURL *url.URL
RoutePrefix string
UseLocalAssets bool
UserAssetsPath string
ConsoleTemplatesPath string
ConsoleLibrariesPath string
EnableLifecycle bool
EnableAdminAPI bool
PageTitle string
RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool
IsAgent bool
AppName string
EnableReceiverRemoteWrite11 bool
Gatherer prometheus.Gatherer
Registerer prometheus.Registerer
@ -352,7 +352,7 @@ func New(logger log.Logger, o *Options) *Handler {
nil,
o.EnableRemoteWriteReceiver,
o.EnableOTLPWriteReceiver,
o.EnableReducedWriteProtoReceiver,
o.EnableReceiverRemoteWrite11,
)
if o.RoutePrefix != "/" {