Remote write v2: metadata support in every write request (#13394)

* Approach bundling metadata along with samples and exemplars

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Add first test; rebase with main

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Alternative approach: bundle metadata in TimeSeries protobuf

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* update go mod to match main branch

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix after rebase

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* we're not going to modify the 1.X format anymore

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Modify AppendMetadata based on the fact that we be putting metadata into
timeseries

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Rename enums for remote write versions to something that makes more
sense + remove the added `sendMetadata` flag.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* rename flag that enables writing of metadata records to the WAL

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* additional clean up

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* lint

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix usage of require.Len

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* some clean up from review comments

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* more review fixes

Signed-off-by: Callum Styan <callumstyan@gmail.com>

---------

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Paschalis Tsilias <paschalist0@gmail.com>
This commit is contained in:
Callum Styan 2024-01-25 13:25:05 -08:00 committed by GitHub
parent 242158e7fc
commit a0f08a8365
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 387 additions and 161 deletions

View file

@ -187,6 +187,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "extra-scrape-metrics": case "extra-scrape-metrics":
c.scrape.ExtraMetrics = true c.scrape.ExtraMetrics = true
level.Info(logger).Log("msg", "Experimental additional scrape metrics enabled") level.Info(logger).Log("msg", "Experimental additional scrape metrics enabled")
case "metadata-wal-records":
c.scrape.EnableMetadataStorage = true
level.Info(logger).Log("msg", "Experimental metadata records in WAL enabled, required for remote write 2.0")
case "new-service-discovery-manager": case "new-service-discovery-manager":
c.enableNewSDManager = true c.enableNewSDManager = true
level.Info(logger).Log("msg", "Experimental service discovery manager") level.Info(logger).Log("msg", "Experimental service discovery manager")
@ -431,7 +434,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, metadata-wal-records. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList) Default("").StringsVar(&cfg.featureList)
a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)"). a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)").
@ -609,7 +612,7 @@ func main() {
var ( var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()} localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{} scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat)) remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat), cfg.scrape.EnableMetadataStorage)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
) )

View file

@ -52,7 +52,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | | <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, metadata-wal-records. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--remote-write-format</code> | remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format) | `0` | | <code class="text-nowrap">--remote-write-format</code> | remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format) | `0` |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` | | <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -204,3 +204,10 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details). Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).
Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped. Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.
## Metadata WAL Records
`--enable-features=metadata-wal-records`
When enabled, Prometheus will store metadata in-memory and keep track of
metadata changes as WAL records on a per-series basis. This must be used if
you are also using remote write 2.0 as it will only gather metadata from the WAL.

4
go.mod
View file

@ -46,7 +46,7 @@ require (
github.com/oklog/ulid v1.3.1 github.com/oklog/ulid v1.3.1
github.com/ovh/go-ovh v1.4.3 github.com/ovh/go-ovh v1.4.3
github.com/prometheus/alertmanager v0.26.0 github.com/prometheus/alertmanager v0.26.0
github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0 github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.1-0.20231122191551-832cd6e99f99 github.com/prometheus/common v0.45.1-0.20231122191551-832cd6e99f99
github.com/prometheus/common/assets v0.2.0 github.com/prometheus/common/assets v0.2.0
@ -168,7 +168,7 @@ require (
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.11.1 // indirect github.com/prometheus/procfs v0.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/objx v0.5.0 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect

8
go.sum
View file

@ -636,8 +636,8 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -667,8 +667,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

View file

@ -211,7 +211,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", UserAgent) httpReq.Header.Set("User-Agent", UserAgent)
if c.rwFormat == Base1 { if c.rwFormat == Version1 {
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue) httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue)
} else { } else {
// Set the right header if we're using v1.1 remote write protocol // Set the right header if we're using v1.1 remote write protocol

View file

@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2" writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -628,17 +629,25 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
} }
} }
func minExemplarProtoToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar { func exemplarProtoV2ToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar {
timestamp := ep.Timestamp timestamp := ep.Timestamp
return exemplar.Exemplar{ return exemplar.Exemplar{
Labels: Uint32StrRefToLabels(symbols, ep.LabelsRefs), Labels: labelProtosV2ToLabels(ep.LabelsRefs, symbols),
Value: ep.Value, Value: ep.Value,
Ts: timestamp, Ts: timestamp,
HasTs: timestamp != 0, HasTs: timestamp != 0,
} }
} }
func metadataProtoV2ToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata {
return metadata.Metadata{
Type: metricTypeFromProtoV2Equivalent(mp.Type),
Unit: symbols[mp.UnitRef],
Help: symbols[mp.HelpRef],
}
}
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the // HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message // provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics. // represents an integer histogram and not a float histogram, or it panics.
@ -660,6 +669,27 @@ func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
} }
} }
// HistogramProtoV2ToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
func HistogramProtoV2ToHistogram(hp writev2.Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the // FloatHistogramProtoToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that // provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram, // the proto message represents a float histogram and not an integer histogram,
@ -682,6 +712,28 @@ func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHi
} }
} }
// FloatHistogramProtoV2ToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents a float histogram and not an integer histogram,
// or it panics.
func FloatHistogramProtoV2ToFloatHistogram(hp writev2.Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message // HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a // to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
// float histogram, or it panics. // float histogram, or it panics.
@ -714,9 +766,9 @@ func FloatMinHistogramProtoToFloatHistogram(hp writev2.Histogram) *histogram.Flo
ZeroCount: hp.GetZeroCountFloat(), ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(), Count: hp.GetCountFloat(),
Sum: hp.Sum, Sum: hp.Sum,
PositiveSpans: minSpansProtoToSpans(hp.GetPositiveSpans()), PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(), PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: minSpansProtoToSpans(hp.GetNegativeSpans()), NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(), NegativeBuckets: hp.GetNegativeCounts(),
} }
} }
@ -735,9 +787,9 @@ func MinHistogramProtoToHistogram(hp writev2.Histogram) *histogram.Histogram {
ZeroCount: hp.GetZeroCountInt(), ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(), Count: hp.GetCountInt(),
Sum: hp.Sum, Sum: hp.Sum,
PositiveSpans: minSpansProtoToSpans(hp.GetPositiveSpans()), PositiveSpans: spansProtoV2ToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(), PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: minSpansProtoToSpans(hp.GetNegativeSpans()), NegativeSpans: spansProtoV2ToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(), NegativeBuckets: hp.GetNegativeDeltas(),
} }
} }
@ -751,7 +803,7 @@ func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
return spans return spans
} }
func minSpansProtoToSpans(s []writev2.BucketSpan) []histogram.Span { func spansProtoV2ToSpans(s []writev2.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s)) spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
@ -870,6 +922,17 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
return b.Labels() return b.Labels()
} }
// labelProtosV2ToLabels transforms v2 proto labels references, which are uint32 values, into labels via
// indexing into the symbols slice.
func labelProtosV2ToLabels(labelRefs []uint32, symbols []string) labels.Labels {
b := labels.ScratchBuilder{}
for i := 0; i < len(labelRefs); i += 2 {
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
}
b.Sort()
return b.Labels()
}
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice // labelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels. // will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label { func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
@ -883,8 +946,7 @@ func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label
return result return result
} }
// TODO. func labelsToLabelsProtoV2Refs(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf []uint32) []uint32 {
result := buf[:0] result := buf[:0]
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
off := symbolTable.RefStr(l.Name) off := symbolTable.RefStr(l.Name)
@ -895,24 +957,6 @@ func labelsToUint32SliceStr(lbls labels.Labels, symbolTable *rwSymbolTable, buf
return result return result
} }
// TODO.
func Uint32StrRefToLabels(symbols []string, minLabels []uint32) labels.Labels {
ls := labels.NewScratchBuilder(len(minLabels) / 2)
strIdx := 0
for strIdx < len(minLabels) {
// todo, check for overflow?
nameIdx := minLabels[strIdx]
strIdx++
valueIdx := minLabels[strIdx]
strIdx++
ls.Add(symbols[nameIdx], symbols[valueIdx])
}
return ls.Labels()
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. // metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_MetricType { func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_MetricType {
mt := strings.ToUpper(string(t)) mt := strings.ToUpper(string(t))
@ -924,6 +968,22 @@ func metricTypeToMetricTypeProto(t model.MetricType) prompb.MetricMetadata_Metri
return prompb.MetricMetadata_MetricType(v) return prompb.MetricMetadata_MetricType(v)
} }
// metricTypeToMetricTypeProtoV2 transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProtoV2(t model.MetricType) writev2.Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt]
if !ok {
return writev2.Metadata_METRIC_TYPE_UNSPECIFIED
}
return writev2.Metadata_MetricType(v)
}
func metricTypeFromProtoV2Equivalent(t writev2.Metadata_MetricType) model.MetricType {
mt := strings.ToLower(t.String())
return model.MetricType(mt) // TODO(@tpaschalis) a better way for this?
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression. // snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
@ -1024,7 +1084,7 @@ func MinimizedWriteRequestToWriteRequest(redReq *writev2.WriteRequest) (*prompb.
} }
for i, rts := range redReq.Timeseries { for i, rts := range redReq.Timeseries {
Uint32StrRefToLabels(redReq.Symbols, rts.LabelsRefs).Range(func(l labels.Label) { labelProtosV2ToLabels(rts.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
Name: l.Name, Name: l.Name,
Value: l.Value, Value: l.Value,
@ -1035,7 +1095,7 @@ func MinimizedWriteRequestToWriteRequest(redReq *writev2.WriteRequest) (*prompb.
for j, e := range rts.Exemplars { for j, e := range rts.Exemplars {
exemplars[j].Value = e.Value exemplars[j].Value = e.Value
exemplars[j].Timestamp = e.Timestamp exemplars[j].Timestamp = e.Timestamp
Uint32StrRefToLabels(redReq.Symbols, e.LabelsRefs).Range(func(l labels.Label) { labelProtosV2ToLabels(e.LabelsRefs, redReq.Symbols).Range(func(l labels.Label) {
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{ exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
Name: l.Name, Name: l.Name,
Value: l.Value, Value: l.Value,

View file

@ -898,7 +898,7 @@ func (c *mockChunkIterator) Err() error {
func TestStrFormat(t *testing.T) { func TestStrFormat(t *testing.T) {
r := newRwSymbolTable() r := newRwSymbolTable()
ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234") ls := labels.FromStrings("asdf", "qwer", "zxcv", "1234")
encoded := labelsToUint32SliceStr(ls, &r, nil) encoded := labelsToLabelsProtoV2Refs(ls, &r, nil)
decoded := Uint32StrRefToLabels(r.LabelsStrings(), encoded) decoded := labelProtosV2ToLabels(encoded, r.LabelsStrings())
require.Equal(t, ls, decoded) require.Equal(t, ls, decoded)
} }

View file

@ -27,7 +27,7 @@ import (
// MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else. // MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else.
type MetadataAppender interface { type MetadataAppender interface {
AppendMetadata(context.Context, []scrape.MetricMetadata) AppendWatcherMetadata(context.Context, []scrape.MetricMetadata)
} }
// Watchable represents from where we fetch active targets for metadata. // Watchable represents from where we fetch active targets for metadata.
@ -146,7 +146,7 @@ func (mw *MetadataWatcher) collect() {
} }
// Blocks until the metadata is sent to the remote write endpoint or hardShutdownContext is expired. // Blocks until the metadata is sent to the remote write endpoint or hardShutdownContext is expired.
mw.writer.AppendMetadata(mw.hardShutdownCtx, metadata) mw.writer.AppendWatcherMetadata(mw.hardShutdownCtx, metadata)
} }
func (mw *MetadataWatcher) ready() bool { func (mw *MetadataWatcher) ready() bool {

View file

@ -57,7 +57,7 @@ type writeMetadataToMock struct {
metadataAppended int metadataAppended int
} }
func (mwtm *writeMetadataToMock) AppendMetadata(_ context.Context, m []scrape.MetricMetadata) { func (mwtm *writeMetadataToMock) AppendWatcherMetadata(_ context.Context, m []scrape.MetricMetadata) {
mwtm.metadataAppended += len(m) mwtm.metadataAppended += len(m)
} }

View file

@ -35,6 +35,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2" writev2 "github.com/prometheus/prometheus/prompb/write/v2"
@ -391,8 +392,8 @@ type WriteClient interface {
type RemoteWriteFormat int64 //nolint:revive // exported. type RemoteWriteFormat int64 //nolint:revive // exported.
const ( const (
Base1 RemoteWriteFormat = iota // original map based format Version1 RemoteWriteFormat = iota // 1.0, 0.1, etc.
MinStrings // symbols are indices into an array of strings Version2 // symbols are indices into an array of strings
) )
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
@ -417,9 +418,10 @@ type QueueManager struct {
clientMtx sync.RWMutex clientMtx sync.RWMutex
storeClient WriteClient storeClient WriteClient
seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata and droppedSeries.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels seriesLabels map[chunks.HeadSeriesRef]labels.Labels
droppedSeries map[chunks.HeadSeriesRef]struct{} seriesMetadata map[chunks.HeadSeriesRef]*metadata.Metadata
droppedSeries map[chunks.HeadSeriesRef]struct{}
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@ -488,6 +490,7 @@ func NewQueueManager(
rwFormat: rwFormat, rwFormat: rwFormat,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
@ -505,7 +508,21 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp, highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) walMetadata := false
if t.rwFormat > Version1 {
walMetadata = true
}
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
// The current MetadataWatcher implementation is mutually exclusive
// with the new approach, which stores metadata as WAL records and
// ships them alongside series. If both mechanisms are set, the new one
// takes precedence by implicitly disabling the older one.
if t.mcfg.Send && t.rwFormat > Version1 {
level.Warn(logger).Log("msg", "usage of 'metadata_config.send' is redundant when using remote write v2 (or higher) as metadata will always be gathered from the WAL and included for every series within each write request")
t.mcfg.Send = false
}
if t.mcfg.Send { if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
} }
@ -514,8 +531,14 @@ func NewQueueManager(
return t return t
} }
// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. // AppendWatcherMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized.
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { // This is only used for the metadata_config.send setting and 1.x Remote Write.
func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
// no op for any newer proto format, which will cache metadata sent to it from the WAL watcher.
if t.rwFormat > Version2 {
return
}
// 1.X will still get metadata in batches.
mm := make([]prompb.MetricMetadata, 0, len(metadata)) mm := make([]prompb.MetricMetadata, 0, len(metadata))
for _, entry := range metadata { for _, entry := range metadata {
mm = append(mm, prompb.MetricMetadata{ mm = append(mm, prompb.MetricMetadata{
@ -605,6 +628,8 @@ outer:
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
} }
// todo: handle or at least log an error if no metadata is found
meta := t.seriesMetadata[s.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// Start with a very small backoff. This should not be t.cfg.MinBackoff // Start with a very small backoff. This should not be t.cfg.MinBackoff
// as it can happen without errors, and we want to pickup work after // as it can happen without errors, and we want to pickup work after
@ -619,6 +644,7 @@ outer:
} }
if t.shards.enqueue(s.Ref, timeSeries{ if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
metadata: meta,
timestamp: s.T, timestamp: s.T,
value: s.V, value: s.V,
sType: tSample, sType: tSample,
@ -658,6 +684,7 @@ outer:
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
} }
meta := t.seriesMetadata[e.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded. // This will only loop if the queues are being resharded.
backoff := t.cfg.MinBackoff backoff := t.cfg.MinBackoff
@ -669,6 +696,7 @@ outer:
} }
if t.shards.enqueue(e.Ref, timeSeries{ if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
metadata: meta,
timestamp: e.T, timestamp: e.T,
value: e.V, value: e.V,
exemplarLabels: e.Labels, exemplarLabels: e.Labels,
@ -706,6 +734,7 @@ outer:
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
} }
meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond) backoff := model.Duration(5 * time.Millisecond)
@ -717,6 +746,7 @@ outer:
} }
if t.shards.enqueue(h.Ref, timeSeries{ if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
metadata: meta,
timestamp: h.T, timestamp: h.T,
histogram: h.H, histogram: h.H,
sType: tHistogram, sType: tHistogram,
@ -753,6 +783,7 @@ outer:
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
continue continue
} }
meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond) backoff := model.Duration(5 * time.Millisecond)
@ -764,6 +795,7 @@ outer:
} }
if t.shards.enqueue(h.Ref, timeSeries{ if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls,
metadata: meta,
timestamp: h.T, timestamp: h.T,
floatHistogram: h.FH, floatHistogram: h.FH,
sType: tFloatHistogram, sType: tFloatHistogram,
@ -858,6 +890,23 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
} }
} }
// StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote.
func (t *QueueManager) StoreMetadata(meta []record.RefMetadata) {
if t.rwFormat < Version2 {
return
}
t.seriesMtx.Lock()
defer t.seriesMtx.Unlock()
for _, m := range meta {
t.seriesMetadata[m.Ref] = &metadata.Metadata{
Type: record.ToMetricType(m.Type),
Unit: m.Unit,
Help: m.Help,
}
}
}
// UpdateSeriesSegment updates the segment number held against the series, // UpdateSeriesSegment updates the segment number held against the series,
// so we can trim older ones in SeriesReset. // so we can trim older ones in SeriesReset.
func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) { func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) {
@ -883,6 +932,7 @@ func (t *QueueManager) SeriesReset(index int) {
delete(t.seriesSegmentIndexes, k) delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k]) t.releaseLabels(t.seriesLabels[k])
delete(t.seriesLabels, k) delete(t.seriesLabels, k)
delete(t.seriesMetadata, k)
delete(t.droppedSeries, k) delete(t.droppedSeries, k)
} }
} }
@ -1107,6 +1157,7 @@ type shards struct {
samplesDroppedOnHardShutdown atomic.Uint32 samplesDroppedOnHardShutdown atomic.Uint32
exemplarsDroppedOnHardShutdown atomic.Uint32 exemplarsDroppedOnHardShutdown atomic.Uint32
histogramsDroppedOnHardShutdown atomic.Uint32 histogramsDroppedOnHardShutdown atomic.Uint32
metadataDroppedOnHardShutdown atomic.Uint32
} }
// start the shards; must be called before any call to enqueue. // start the shards; must be called before any call to enqueue.
@ -1135,6 +1186,7 @@ func (s *shards) start(n int) {
s.samplesDroppedOnHardShutdown.Store(0) s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0)
s.histogramsDroppedOnHardShutdown.Store(0) s.histogramsDroppedOnHardShutdown.Store(0)
s.metadataDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i]) go s.runShard(hardShutdownCtx, i, newQueues[i])
} }
@ -1225,6 +1277,7 @@ type timeSeries struct {
value float64 value float64
histogram *histogram.Histogram histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram floatHistogram *histogram.FloatHistogram
metadata *metadata.Metadata
timestamp int64 timestamp int64
exemplarLabels labels.Labels exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram. // The type of series: sample, exemplar, or histogram.
@ -1238,6 +1291,7 @@ const (
tExemplar tExemplar
tHistogram tHistogram
tFloatHistogram tFloatHistogram
tMetadata
) )
func newQueue(batchSize, capacity int) *queue { func newQueue(batchSize, capacity int) *queue {
@ -1261,6 +1315,9 @@ func newQueue(batchSize, capacity int) *queue {
func (q *queue) Append(datum timeSeries) bool { func (q *queue) Append(datum timeSeries) bool {
q.batchMtx.Lock() q.batchMtx.Lock()
defer q.batchMtx.Unlock() defer q.batchMtx.Unlock()
// TODO: check if metadata now means we've reduced the total # of samples
// we can batch together here, and if so find a way to not include metadata
// in the batch size calculation.
q.batch = append(q.batch, datum) q.batch = append(q.batch, datum)
if len(q.batch) == cap(q.batch) { if len(q.batch) == cap(q.batch) {
select { select {
@ -1366,6 +1423,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBufRaw []byte pBufRaw []byte
buf []byte buf []byte
) )
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars { if s.qm.sendExemplars {
max += int(float64(max) * 0.1) max += int(float64(max) * 0.1)
} }
@ -1420,14 +1478,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
return return
} }
switch s.qm.rwFormat { switch s.qm.rwFormat {
case Base1: case Version1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
case MinStrings: case Version2:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
} }
@ -1440,16 +1498,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
batch := queue.Batch() batch := queue.Batch()
if len(batch) > 0 { if len(batch) > 0 {
switch s.qm.rwFormat { switch s.qm.rwFormat {
case Base1: case Version1:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
case MinStrings: case Version2:
nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
n := nPendingSamples + nPendingExemplars + nPendingHistograms n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf)
symbolTable.clear() symbolTable.clear()
} }
} }
@ -1474,6 +1532,7 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
@ -1499,31 +1558,28 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { func (s *shards) sendSamples(ctx context.Context, series []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will // Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails. // only error if marshaling the proto to bytes fails.
req, highest, err := buildWriteRequest(samples, nil, pBuf, buf) req, highest, err := buildWriteRequest(series, nil, pBuf, buf)
if err == nil { if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, 0, highest)
} }
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
} }
func (s *shards) sendMinStrSamples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) { func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) {
begin := time.Now() begin := time.Now()
// Build the ReducedWriteRequest with no metadata.
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
req, highest, err := buildMinimizedWriteRequestStr(samples, labels, pBuf, buf) req, highest, err := buildMinimizedWriteRequestStr(samples, labels, pBuf, buf)
if err == nil { if err == nil {
err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, metadataCount, highest)
} }
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
} }
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount int, duration time.Duration) { func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
if err != nil { if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
@ -1533,7 +1589,7 @@ func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exem
// These counters are used to calculate the dynamic sharding, and as such // These counters are used to calculate the dynamic sharding, and as such
// should be maintained irrespective of success or failure. // should be maintained irrespective of success or failure.
s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount)) s.qm.dataOut.incr(int64(sampleCount + exemplarCount + histogramCount + metadataCount))
s.qm.dataOutDuration.incr(int64(duration)) s.qm.dataOutDuration.incr(int64(duration))
s.qm.lastSendTimestamp.Store(time.Now().Unix()) s.qm.lastSendTimestamp.Store(time.Now().Unix())
// Pending samples/exemplars/histograms also should be subtracted, as an error means // Pending samples/exemplars/histograms also should be subtracted, as an error means
@ -1547,8 +1603,9 @@ func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exem
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, sampleCount, exemplarCount, histogramCount int, highestTimestamp int64) error { func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, sampleCount, exemplarCount, histogramCount, metadataCount int, highestTimestamp int64) error {
reqSize := len(rawReq) reqSize := len(rawReq)
// An anonymous function allows us to defer the completion of our per-try spans // An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any // without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3. // parameters for sendSamplesWithBackoff/3.
@ -1575,6 +1632,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
s.qm.metrics.samplesTotal.Add(float64(sampleCount)) s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
err := s.qm.client().Store(ctx, rawReq, try) err := s.qm.client().Store(ctx, rawReq, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -1605,10 +1663,18 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp
return err return err
} }
func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateV2TimeSeries(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
// todo: should we also safeguard against empty metadata here?
if d.metadata != nil {
pendingData[nPending].Metadata.Type = metricTypeToMetricTypeProtoV2(d.metadata.Type)
pendingData[nPending].Metadata.HelpRef = symbolTable.RefStr(d.metadata.Help)
pendingData[nPending].Metadata.HelpRef = symbolTable.RefStr(d.metadata.Unit)
nPendingMetadata++
}
if sendExemplars { if sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
} }
@ -1621,7 +1687,7 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
// pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].LabelsRefs = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelsRefs) pendingData[nPending].LabelsRefs = labelsToLabelsProtoV2Refs(d.seriesLabels, symbolTable, pendingData[nPending].LabelsRefs)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{
@ -1631,7 +1697,7 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{
LabelsRefs: labelsToUint32SliceStr(d.exemplarLabels, symbolTable, nil), // TODO: optimize, reuse slice LabelsRefs: labelsToLabelsProtoV2Refs(d.exemplarLabels, symbolTable, nil), // TODO: optimize, reuse slice
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
@ -1642,9 +1708,12 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri
case tFloatHistogram: case tFloatHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToMinHistogramProto(d.timestamp, d.floatHistogram)) pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToMinHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++ nPendingHistograms++
case tMetadata:
// TODO: log or return an error?
// we shouldn't receive metadata type data here, it should already be inserted into the timeSeries
} }
} }
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
} }
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {

View file

@ -76,11 +76,12 @@ func TestSampleDelivery(t *testing.T) {
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, {samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, {samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{rwFormat: MinStrings, samples: true, exemplars: false, histograms: false, name: "interned samples only"}, // TODO: update some portion of this test to check for the 2.0 metadata
{rwFormat: MinStrings, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"}, {samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only", rwFormat: Version2},
{rwFormat: MinStrings, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"}, {samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms", rwFormat: Version2},
{rwFormat: MinStrings, samples: false, exemplars: false, histograms: true, name: "interned histograms only"}, {samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only", rwFormat: Version2},
{rwFormat: MinStrings, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"}, {samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only", rwFormat: Version2},
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only", rwFormat: Version2},
} }
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
@ -107,11 +108,12 @@ func TestSampleDelivery(t *testing.T) {
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat, true)
defer s.Close() defer s.Close()
var ( var (
series []record.RefSeries series []record.RefSeries
metadata []record.RefMetadata
samples []record.RefSample samples []record.RefSample
exemplars []record.RefExemplar exemplars []record.RefExemplar
histograms []record.RefHistogramSample histograms []record.RefHistogramSample
@ -131,6 +133,7 @@ func TestSampleDelivery(t *testing.T) {
if tc.floatHistograms { if tc.floatHistograms {
_, floatHistograms, series = createHistograms(n, n, true) _, floatHistograms, series = createHistograms(n, n, true)
} }
metadata = createSeriesMetadata(series)
// Apply new config. // Apply new config.
queueConfig.Capacity = len(samples) queueConfig.Capacity = len(samples)
@ -144,6 +147,7 @@ func TestSampleDelivery(t *testing.T) {
qm.SetClient(c) qm.SetClient(c)
qm.StoreSeries(series, 0) qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
// Send first half of data. // Send first half of data.
c.expectSamples(samples[:len(samples)/2], series) c.expectSamples(samples[:len(samples)/2], series)
@ -186,12 +190,12 @@ type perRequestWriteClient struct {
func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient { func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient {
return &perRequestWriteClient{ return &perRequestWriteClient{
expectUnorderedRequests: expectUnorderedRequests, expectUnorderedRequests: expectUnorderedRequests,
TestWriteClient: NewTestWriteClient(MinStrings), TestWriteClient: NewTestWriteClient(Version2),
} }
} }
func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) { func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) {
tc := NewTestWriteClient(MinStrings) tc := NewTestWriteClient(Version2)
c.requests = append(c.requests, tc) c.requests = append(c.requests, tc)
c.expectedSeries = series c.expectedSeries = series
@ -334,7 +338,7 @@ func TestHistogramSampleBatching(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, MinStrings) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version2)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -346,7 +350,7 @@ func TestHistogramSampleBatching(t *testing.T) {
} }
func TestMetadataDelivery(t *testing.T) { func TestMetadataDelivery(t *testing.T) {
c := NewTestWriteClient(Base1) c := NewTestWriteClient(Version1)
dir := t.TempDir() dir := t.TempDir()
@ -369,7 +373,7 @@ func TestMetadataDelivery(t *testing.T) {
}) })
} }
m.AppendMetadata(context.Background(), metadata) m.AppendWatcherMetadata(context.Background(), metadata)
require.Len(t, c.receivedMetadata, numMetadata) require.Len(t, c.receivedMetadata, numMetadata)
// One more write than the rounded qoutient should be performed in order to get samples that didn't // One more write than the rounded qoutient should be performed in order to get samples that didn't
@ -379,8 +383,48 @@ func TestMetadataDelivery(t *testing.T) {
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
} }
func TestWALMetadataDelivery(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version2, true)
defer s.Close()
cfg := config.DefaultQueueConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
writeConfig.QueueConfig = cfg
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
writeConfig,
},
}
num := 3
_, series := createTimeseries(0, num)
metadata := createSeriesMetadata(series)
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient(Version1)
qm.SetClient(c)
qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
require.Len(t, qm.seriesLabels, num)
require.Len(t, qm.seriesMetadata, num)
c.waitForExpectedData(t, 30*time.Second)
}
func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration // Let's send one less sample than batch size, and wait the timeout duration
n := 9 n := 9
@ -412,7 +456,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
} }
func TestSampleDeliveryOrder(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
ts := 10 ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
@ -462,7 +506,7 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -500,7 +544,7 @@ func TestSeriesReset(t *testing.T) {
cfg := testDefaultQueueConfig() cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -514,7 +558,7 @@ func TestSeriesReset(t *testing.T) {
} }
func TestReshard(t *testing.T) { func TestReshard(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
size := 10 // Make bigger to find more races. size := 10 // Make bigger to find more races.
nSeries := 6 nSeries := 6
@ -557,12 +601,11 @@ func TestReshard(t *testing.T) {
} }
func TestReshardRaceWithStop(t *testing.T) { func TestReshardRaceWithStop(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
c := NewTestWriteClient(rwFormat) c := NewTestWriteClient(rwFormat)
var m *QueueManager var m *QueueManager
h := sync.Mutex{} h := sync.Mutex{}
h.Lock() h.Lock()
cfg := testDefaultQueueConfig() cfg := testDefaultQueueConfig()
@ -596,7 +639,7 @@ func TestReshardRaceWithStop(t *testing.T) {
} }
func TestReshardPartialBatch(t *testing.T) { func TestReshardPartialBatch(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(1, 10) samples, series := createTimeseries(1, 10)
@ -642,7 +685,7 @@ func TestReshardPartialBatch(t *testing.T) {
// where a large scrape (> capacity + max samples per send) is appended at the // where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline. // same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) { func TestQueueFilledDeadlock(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(50, 1) samples, series := createTimeseries(50, 1)
@ -684,7 +727,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
} }
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { for _, rwFormat := range []RemoteWriteFormat{Version1, Version2} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
cfg := testDefaultQueueConfig() cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -693,7 +736,6 @@ func TestReleaseNoninternedString(t *testing.T) {
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
for i := 1; i < 1000; i++ { for i := 1; i < 1000; i++ {
m.StoreSeries([]record.RefSeries{ m.StoreSeries([]record.RefSeries{
{ {
@ -739,8 +781,8 @@ func TestShouldReshard(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
// todo: test with new proto type(s) // todo: test with new proto type(s)
client := NewTestWriteClient(Base1) client := NewTestWriteClient(Version1)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
m.numShards = c.startingShards m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn) m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut) m.dataOut.incr(c.samplesOut)
@ -856,6 +898,20 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
return histograms, nil, series return histograms, nil, series
} }
func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata {
metas := make([]record.RefMetadata, len(series))
for _, s := range series {
metas = append(metas, record.RefMetadata{
Ref: s.Ref,
Type: uint8(record.Counter),
Unit: "unit text",
Help: "help text",
})
}
return metas
}
func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) { func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) {
samples := make([]record.RefSample, buckets+2) samples := make([]record.RefSample, buckets+2)
series := make([]record.RefSeries, buckets+2) series := make([]record.RefSeries, buckets+2)
@ -1041,10 +1097,10 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
var reqProto *prompb.WriteRequest var reqProto *prompb.WriteRequest
switch c.rwFormat { switch c.rwFormat {
case Base1: case Version1:
reqProto = &prompb.WriteRequest{} reqProto = &prompb.WriteRequest{}
err = proto.Unmarshal(reqBuf, reqProto) err = proto.Unmarshal(reqBuf, reqProto)
case MinStrings: case Version2:
var reqMin writev2.WriteRequest var reqMin writev2.WriteRequest
err = proto.Unmarshal(reqBuf, &reqMin) err = proto.Unmarshal(reqBuf, &reqMin)
if err == nil { if err == nil {
@ -1164,7 +1220,7 @@ func BenchmarkSampleSend(b *testing.B) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -1211,7 +1267,7 @@ func BenchmarkStartup(b *testing.B) {
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2] m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run() err := m.watcher.Run()
@ -1295,7 +1351,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual
@ -1364,8 +1420,8 @@ func TestCalculateDesiredShards(t *testing.T) {
} }
func TestCalculateDesiredShardsDetail(t *testing.T) { func TestCalculateDesiredShardsDetail(t *testing.T) {
c := NewTestWriteClient(Base1) c := NewTestWriteClient(Version1)
cfg := testDefaultQueueConfig() cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
dir := t.TempDir() dir := t.TempDir()
@ -1373,7 +1429,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
// todo: test with new proto type(s) // todo: test with new proto type(s)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
for _, tc := range []struct { for _, tc := range []struct {
name string name string
@ -1713,14 +1769,14 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
// Warmup buffers // Warmup buffers
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true) populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff) buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
} }
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0 totalSize := 0
for j := 0; j < b.N; j++ { for j := 0; j < b.N; j++ {
populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true) populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer() b.ResetTimer()
req, _, err := buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff) req, _, err := buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff)
if err != nil { if err != nil {

View file

@ -92,7 +92,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
// todo: test with new format type(s)? // todo: test with new format type(s)?
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs, RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
} }
// NewStorage returns a remote.Storage. // NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *Storage { func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, metadataInWAL bool) *Storage {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger, logger: logger,
localStartTimeCallback: stCallback, localStartTimeCallback: stCallback,
} }
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat) s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat, metadataInWAL)
return s return s
} }

View file

@ -30,7 +30,7 @@ func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{ RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -58,7 +58,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{}, GlobalConfig: config.GlobalConfig{},
@ -80,7 +80,7 @@ func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{ GlobalConfig: config.GlobalConfig{
@ -106,7 +106,7 @@ func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{ GlobalConfig: config.GlobalConfig{
@ -158,7 +158,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
// ApplyConfig runs concurrently with Notify // ApplyConfig runs concurrently with Notify
// See https://github.com/prometheus/prometheus/issues/12747 // See https://github.com/prometheus/prometheus/issues/12747
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) { func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, Base1) s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, Version1, false)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2000) wg.Add(2000)

View file

@ -15,6 +15,7 @@ package remote
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math" "math"
"sync" "sync"
@ -66,6 +67,7 @@ type WriteStorage struct {
dir string dir string
queues map[string]*QueueManager queues map[string]*QueueManager
rwFormat RemoteWriteFormat rwFormat RemoteWriteFormat
metadataInWAL bool
samplesIn *ewmaRate samplesIn *ewmaRate
flushDeadline time.Duration flushDeadline time.Duration
interner *pool interner *pool
@ -77,7 +79,7 @@ type WriteStorage struct {
} }
// NewWriteStorage creates and runs a WriteStorage. // NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *WriteStorage { func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, metadataInWal bool) *WriteStorage {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -147,6 +149,10 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueues := make(map[string]*QueueManager) newQueues := make(map[string]*QueueManager)
newHashes := []string{} newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs { for _, rwConf := range conf.RemoteWriteConfigs {
// todo: change the rws.rwFormat to a queue config field
if rws.rwFormat > Version1 && rws.metadataInWAL {
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 then the feature flag for metadata records in the WAL must be enabled")
}
hash, err := toHash(rwConf) hash, err := toHash(rwConf)
if err != nil { if err != nil {
return err return err

View file

@ -78,9 +78,9 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: this should eventually be done via content negotiation/looking at the header // TODO: this should eventually be done via content negotiation/looking at the header
switch h.rwFormat { switch h.rwFormat {
case Base1: case Version1:
req, err = DecodeWriteRequest(r.Body) req, err = DecodeWriteRequest(r.Body)
case MinStrings: case Version2:
reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body) reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body)
} }
@ -92,9 +92,9 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: this should eventually be done detecting the format version above // TODO: this should eventually be done detecting the format version above
switch h.rwFormat { switch h.rwFormat {
case Base1: case Version1:
err = h.write(r.Context(), req) err = h.write(r.Context(), req)
case MinStrings: case Version2:
err = h.writeMinStr(r.Context(), reqMinStr) err = h.writeMinStr(r.Context(), reqMinStr)
} }
@ -348,7 +348,7 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteReques
}() }()
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
ls := Uint32StrRefToLabels(req.Symbols, ts.LabelsRefs) ls := labelProtosV2ToLabels(ts.LabelsRefs, req.Symbols)
err := h.appendMinSamples(app, ts.Samples, ls) err := h.appendMinSamples(app, ts.Samples, ls)
if err != nil { if err != nil {
@ -356,7 +356,7 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteReques
} }
for _, ep := range ts.Exemplars { for _, ep := range ts.Exemplars {
e := minExemplarProtoToExemplar(ep, req.Symbols) e := exemplarProtoV2ToExemplar(ep, req.Symbols)
h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs) h.appendExemplar(app, e, ls, &outOfOrderExemplarErrs)
} }
@ -364,6 +364,12 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteReques
if err != nil { if err != nil {
return err return err
} }
m := metadataProtoV2ToMetadata(ts.Metadata, req.Symbols)
if _, err = app.UpdateMetadata(0, ls, m); err != nil {
level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err)
}
} }
if outOfOrderExemplarErrs > 0 { if outOfOrderExemplarErrs > 0 {

View file

@ -46,7 +46,7 @@ func TestRemoteWriteHandler(t *testing.T) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -94,7 +94,7 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(nil, nil, appendable, MinStrings) handler := NewWriteHandler(nil, nil, appendable, Version2)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -107,30 +107,32 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
k := 0 k := 0
// the reduced write request is equivalent to the write request fixture. // the reduced write request is equivalent to the write request fixture.
// we can use it for // we can use it for
for _, ts := range writeRequestFixture.Timeseries { for _, ts := range writeRequestMinimizedFixture.Timeseries {
ls := labelProtosToLabels(ts.Labels) ls := labelProtosV2ToLabels(ts.LabelsRefs, writeRequestMinimizedFixture.Symbols)
for _, s := range ts.Samples { for _, s := range ts.Samples {
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++ i++
} }
for _, e := range ts.Exemplars { for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels) exemplarLabels := labelProtosV2ToLabels(e.LabelsRefs, writeRequestMinimizedFixture.Symbols)
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++ j++
} }
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp) fh := FloatHistogramProtoV2ToFloatHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else { } else {
h := HistogramProtoToHistogram(hp) h := HistogramProtoV2ToHistogram(hp)
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
} }
k++ k++
} }
// todo: check for metadata
} }
} }
@ -148,7 +150,7 @@ func TestOutOfOrderSample(t *testing.T) {
latestSample: 100, latestSample: 100,
} }
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -174,7 +176,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
latestExemplar: 100, latestExemplar: 100,
} }
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -198,7 +200,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
latestHistogram: 100, latestHistogram: 100,
} }
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -227,7 +229,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
b.ResetTimer() b.ResetTimer()
@ -247,7 +249,7 @@ func TestCommitErr(t *testing.T) {
commitErr: fmt.Errorf("commit error"), commitErr: fmt.Errorf("commit error"),
} }
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -273,7 +275,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close()) require.NoError(b, db.Close())
}) })
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1) handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err) require.NoError(b, err)
@ -390,7 +392,7 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, nil return 0, nil
} }
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write. // TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write.
// UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now.
return 0, nil return 0, nil

View file

@ -118,7 +118,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1) s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs, RemoteWriteConfigs: tc.cfgs,
@ -141,7 +141,7 @@ func TestRestartOnNameChange(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1) s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Base1) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Version1, false)
c1 := &config.RemoteWriteConfig{ c1 := &config.RemoteWriteConfig{
Name: "named", Name: "named",
URL: &common_config.URL{ URL: &common_config.URL{
@ -208,7 +208,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{ RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -226,7 +226,7 @@ func TestUpdateExternalLabels(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Base1) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Version1, false)
externalLabels := labels.FromStrings("external", "true") externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{ conf := &config.Config{
@ -256,7 +256,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.GlobalConfig{}, GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{ RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -282,7 +282,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// todo: test with new format type(s) // todo: test with new format type(s)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Version1, false)
c0 := &config.RemoteWriteConfig{ c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second), RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -88,7 +88,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
t.Helper() t.Helper()
dbDir := t.TempDir() dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, remote.Base1) rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, remote.Version1, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}) })
@ -584,7 +584,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, remote.Base1) rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, remote.Version1, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}) })
@ -604,7 +604,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) { func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir() dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, remote.Base1) rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, remote.Version1, false)
defer func() { defer func() {
require.NoError(t, rs.Close()) require.NoError(t, rs.Close())
}() }()

View file

@ -56,6 +56,7 @@ type WriteTo interface {
AppendHistograms([]record.RefHistogramSample) bool AppendHistograms([]record.RefHistogramSample) bool
AppendFloatHistograms([]record.RefFloatHistogramSample) bool AppendFloatHistograms([]record.RefFloatHistogramSample) bool
StoreSeries([]record.RefSeries, int) StoreSeries([]record.RefSeries, int)
StoreMetadata([]record.RefMetadata)
// Next two methods are intended for garbage-collection: first we call // Next two methods are intended for garbage-collection: first we call
// UpdateSeriesSegment on all current series // UpdateSeriesSegment on all current series
@ -87,6 +88,7 @@ type Watcher struct {
lastCheckpoint string lastCheckpoint string
sendExemplars bool sendExemplars bool
sendHistograms bool sendHistograms bool
sendMetadata bool
metrics *WatcherMetrics metrics *WatcherMetrics
readerMetrics *LiveReaderMetrics readerMetrics *LiveReaderMetrics
@ -169,7 +171,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
} }
// NewWatcher creates a new WAL watcher for a given WriteTo. // NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher { func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -182,6 +184,7 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
name: name, name: name,
sendExemplars: sendExemplars, sendExemplars: sendExemplars,
sendHistograms: sendHistograms, sendHistograms: sendHistograms,
sendMetadata: sendMetadata,
readNotify: make(chan struct{}), readNotify: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -540,6 +543,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
histogramsToSend []record.RefHistogramSample histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample floatHistogramsToSend []record.RefFloatHistogramSample
metadata []record.RefMetadata
) )
for r.Next() && !isClosed(w.quit) { for r.Next() && !isClosed(w.quit) {
rec := r.Record() rec := r.Record()
@ -651,6 +655,17 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendFloatHistograms(floatHistogramsToSend) w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0] floatHistogramsToSend = floatHistogramsToSend[:0]
} }
case record.Metadata:
if !w.sendMetadata || !tail {
break
}
meta, err := dec.Metadata(rec, metadata[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreMetadata(meta)
case record.Tombstones: case record.Tombstones:
default: default:

View file

@ -84,6 +84,8 @@ func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) {
wtm.UpdateSeriesSegment(series, index) wtm.UpdateSeriesSegment(series, index)
} }
func (wtm *writeToMock) StoreMetadata(_ []record.RefMetadata) { /* no-op */ }
func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) { func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) {
wtm.seriesLock.Lock() wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock() defer wtm.seriesLock.Unlock()
@ -210,7 +212,7 @@ func TestTailSamples(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true)
watcher.SetStartTime(now) watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -295,7 +297,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
go watcher.Start() go watcher.Start()
expected := seriesCount expected := seriesCount
@ -384,7 +386,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
readTimeout = time.Second readTimeout = time.Second
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
go watcher.Start() go watcher.Start()
expected := seriesCount * 2 expected := seriesCount * 2
@ -455,7 +457,7 @@ func TestReadCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
go watcher.Start() go watcher.Start()
expectedSeries := seriesCount expectedSeries := seriesCount
@ -524,7 +526,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
} }
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -597,7 +599,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
readTimeout = time.Second readTimeout = time.Second
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
go watcher.Start() go watcher.Start()
@ -676,7 +678,7 @@ func TestRun_StartupTime(t *testing.T) {
require.NoError(t, w.Close()) require.NoError(t, w.Close())
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = segments watcher.MaxSegment = segments
watcher.setMetrics() watcher.setMetrics()

View file

@ -462,7 +462,7 @@ func TestEndpoints(t *testing.T) {
// TODO: test with other proto format(s)? // TODO: test with other proto format(s)?
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil return 0, nil
}, dbDir, 1*time.Second, nil, remote.Base1) }, dbDir, 1*time.Second, nil, remote.Version1, false)
err = remote.ApplyConfig(&config.Config{ err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{ RemoteReadConfigs: []*config.RemoteReadConfig{

View file

@ -136,7 +136,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
nil, nil,
nil, nil,
false, false,
remote.Base1, remote.Version1,
false, // Disable experimental reduce remote write proto support. false, // Disable experimental reduce remote write proto support.
) )