ct: Support CTs in WAL; change sample record; use in PRW 2.0

Fixes https://github.com/prometheus/prometheus/issues/14218 and https://github.com/prometheus/prometheus/issues/14220

Rebased version of https://github.com/prometheus/prometheus/pull/15254 with improvements.

This change does the following:
- Change appender interface to be CT aware (optional CT)
- Add created-timestamp-per-sample feature flag
- Add new sample record used only if CT is appended with the sample.
- Remote Write awareness of CT.

Signed-off-by: Ridwan Sharif <ridwanmsharif@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>

# Conflicts:
#	cmd/prometheus/main.go
#	scrape/helpers_test.go
#	storage/remote/write_handler_test.go
This commit is contained in:
Ridwan Sharif 2024-10-24 18:19:20 +00:00 committed by bwplotka
parent 0e4e5a71bd
commit aee78bdb31
25 changed files with 517 additions and 95 deletions

View file

@ -257,9 +257,22 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "ooo-native-histograms":
c.tsdb.EnableOOONativeHistograms = true
logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true")
case "created-timestamp-per-sample":
c.scrape.EnableCreatedTimestampPerSample = true
// TODO(bwplotka): Add support for CT per sample in:
// * Native histogram WAL records.
// * PRW and OTLP receiving
// * PromQL engine (accessing from WAL)
// * TSDB storage
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
logger.Info("Experimental created timestamp per sample enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "created-timestamp-zero-ingestion":
c.scrape.EnableCreatedTimestampZeroIngestion = true
c.web.CTZeroIngestionEnabled = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
@ -1644,6 +1657,10 @@ func (n notReadyAppender) Append(_ storage.SeriesRef, _ labels.Labels, _ int64,
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendWithCT(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ float64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
@ -1652,6 +1669,10 @@ func (n notReadyAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendHistogramWithCT(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View file

@ -79,15 +79,50 @@ Enables PromQL functions that are considered experimental. These functions
might change their name, syntax, or semantics. They might also get removed
entirely.
## Created Timestamps per sample
`--enable-feature=created-timestamp-per-sample`
When enabled, Prometheus will store created timestamps (CT) per sample, which
allows persisting CTs across restarts in the WAL, block storage,
as well as using it in Remote Write 2.0 and PromQL engine.
This must be used if you would like to send created timestamps using the new remote write 2.0.
Important: Enhanced sample records are only readable by v3.3+ Prometheus versions.
### Current State
This flag intends to cover all experimental features related to created timestamps
per sample approach.
At the moment, only WAL storage is implemented. CT is stored per sample in a
new samples WAL record. CTs are used for Remote Write 2.0 endpoints if configured.
In the future, we intend to enable Prometheus PromQL to use CTs in sample WAL records,
as well as storing it in TSDB block storage.
Enable the `created-timestamp-zero-ingestion` explained below, if you wish to have PromQL
aware of CTs in a form of synthetic zero samples, with all its consequences.
### Client Considerations
Currently, Prometheus supports created timestamps only on the traditional
Prometheus Protobuf protocol. As a result, when enabling
this feature, the Prometheus protobuf scrape protocol will be prioritized
(See `scrape_config.scrape_protocols` settings for more details). Community is
working on [accessible CTs through OpenMetrics 2.0 too]().
Besides enabling this feature in Prometheus, CT need to be exposed by the application being scraped.
## Created Timestamps Zero Injection
`--enable-feature=created-timestamp-zero-ingestion`
Enables ingestion of created timestamp. Created timestamps are injected as 0 valued samples when appropriate. See [PromCon talk](https://youtu.be/nWf0BfQ5EEA) for details.
Enables ingestion of created timestamp (CT). Created timestamps are injected as
0 valued samples when appropriate. See [PromCon talk](https://youtu.be/nWf0BfQ5EEA) for details.
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).
Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.
The [client considerations for CTs](#client-considerations) also applies here.
## Concurrent evaluation of independent rules

View file

@ -49,6 +49,10 @@ func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (s
return 1, nil
}
func (a nopAppender) AppendWithCT(storage.SeriesRef, labels.Labels, int64, int64, float64) (storage.SeriesRef, error) {
return 6, nil
}
func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 2, nil
}
@ -57,7 +61,11 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
return 3, nil
}
func (a nopAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
func (a nopAppender) AppendHistogramWithCT(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 7, nil
}
func (a nopAppender) AppendHistogramCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}
@ -74,7 +82,7 @@ func (a nopAppender) Rollback() error { return nil }
type floatSample struct {
metric labels.Labels
t int64
ct, t int64
f float64
}
@ -85,7 +93,7 @@ func equalFloatSamples(a, b floatSample) bool {
type histogramSample struct {
metric labels.Labels
t int64
ct, t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
@ -140,10 +148,15 @@ type collectResultAppender struct {
func (a *collectResultAppender) SetOptions(_ *storage.AppendOptions) {}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return a.AppendWithCT(ref, lset, t, 0, v)
}
func (a *collectResultAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset,
ct: ct,
t: t,
f: v,
})
@ -155,7 +168,7 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
return ref, nil
}
ref, err := a.next.Append(ref, lset, t, v)
ref, err := a.next.AppendWithCT(ref, lset, t, ct, v)
if err != nil {
return 0, err
}
@ -174,14 +187,18 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
}
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return a.AppendHistogramWithCT(ref, l, t, 0, h, fh)
}
func (a *collectResultAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, ct: ct, metric: l})
if a.next == nil {
return 0, nil
}
return a.next.AppendHistogram(ref, l, t, h, fh)
return a.next.AppendHistogramWithCT(ref, l, t, ct, h, fh)
}
func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, _, ct int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {

View file

@ -81,17 +81,23 @@ type Options struct {
AppendMetadata bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
// Option to enable the ingestion of native histograms.
EnableNativeHistogramsIngestion bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
// private option for testability.
skipOffsetting bool
// ---Feature flags:
// EnableNativeHistogramsIngestion enables the ingestion of native histograms.
EnableNativeHistogramsIngestion bool
// EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
// EnableCreatedTimestampPerSample enables the ingestion of the created timestamp per sample,
// through the TSDB AppendWithCT and AppendHistogramWithCT methods.
EnableCreatedTimestampPerSample bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles

View file

@ -195,6 +195,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.convertClassicHistToNHCB,
options.EnableNativeHistogramsIngestion,
options.EnableCreatedTimestampZeroIngestion,
options.EnableCreatedTimestampPerSample,
options.ExtraMetrics,
options.AppendMetadata,
opts.target,
@ -916,6 +917,7 @@ type scrapeLoop struct {
// Feature flagged options.
enableNativeHistogramIngestion bool
enableCTZeroIngestion bool
enableCTPerSample bool
appender func(ctx context.Context) storage.Appender
symbolTable *labels.SymbolTable
@ -1223,6 +1225,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB bool,
enableNativeHistogramIngestion bool,
enableCTZeroIngestion bool,
enableCTPerSample bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
@ -1279,6 +1282,7 @@ func newScrapeLoop(ctx context.Context,
convertClassicHistToNHCB: convertClassicHistToNHCB,
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
enableCTZeroIngestion: enableCTZeroIngestion,
enableCTPerSample: enableCTPerSample,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
@ -1746,6 +1750,12 @@ loop:
if seriesAlreadyScraped && parsedTimestamp == nil {
err = storage.ErrDuplicateSampleForTimestamp
} else {
var ct int64
if sl.enableCTPerSample {
if ctMs := p.CreatedTimestamp(); ctMs != nil {
ct = *ctMs
}
}
if sl.enableCTZeroIngestion {
if ctMs := p.CreatedTimestamp(); ctMs != nil {
if isHistogram && sl.enableNativeHistogramIngestion {
@ -1760,19 +1770,19 @@ loop:
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", ct, "t", t, "err", err)
}
}
}
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
ref, err = app.AppendHistogramWithCT(ref, lset, t, ct, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
ref, err = app.AppendHistogramWithCT(ref, lset, t, ct, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
ref, err = app.AppendWithCT(ref, lset, t, ct, val)
}
}

View file

@ -958,6 +958,7 @@ func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper s
false,
false,
false,
false,
true,
nil,
false,
@ -1105,6 +1106,7 @@ func TestScrapeLoopRun(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -1252,6 +1254,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -2815,6 +2818,10 @@ type errorAppender struct {
}
func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return app.AppendWithCT(ref, lset, t, 0, v)
}
func (app *errorAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
switch lset.Get(model.MetricNameLabel) {
case "out_of_order":
return 0, storage.ErrOutOfOrderSample
@ -2823,7 +2830,7 @@ func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t in
case "out_of_bounds":
return 0, storage.ErrOutOfBounds
default:
return app.collectResultAppender.Append(ref, lset, t, v)
return app.collectResultAppender.AppendWithCT(ref, lset, t, ct, v)
}
}

View file

@ -332,13 +332,17 @@ type limitAppender struct {
}
func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return app.AppendWithCT(ref, lset, t, 0, v)
}
func (app *limitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
ref, err := app.Appender.Append(ref, lset, t, v)
ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v)
if err != nil {
return 0, err
}
@ -352,11 +356,15 @@ type timeLimitAppender struct {
}
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return app.AppendWithCT(ref, lset, t, 0, v)
}
func (app *timeLimitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}
ref, err := app.Appender.Append(ref, lset, t, v)
ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v)
if err != nil {
return 0, err
}
@ -371,6 +379,10 @@ type bucketLimitAppender struct {
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return app.AppendHistogramWithCT(ref, lset, t, 0, h, fh)
}
func (app *bucketLimitAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
// Return with an early error if the histogram has too many buckets and the
// schema is not exponential, in which case we can't reduce the resolution.
@ -397,7 +409,7 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe
fh = fh.ReduceResolution(fh.Schema - 1)
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
ref, err := app.Appender.AppendHistogramWithCT(ref, lset, t, ct, h, fh)
if err != nil {
return 0, err
}
@ -411,6 +423,10 @@ type maxSchemaAppender struct {
}
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return app.AppendHistogramWithCT(ref, lset, t, 0, h, fh)
}
func (app *maxSchemaAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema {
h = h.ReduceResolution(app.maxSchema)
@ -421,7 +437,7 @@ func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels
fh = fh.ReduceResolution(app.maxSchema)
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
ref, err := app.Appender.AppendHistogramWithCT(ref, lset, t, ct, h, fh)
if err != nil {
return 0, err
}

View file

@ -171,6 +171,20 @@ func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float
return ref, nil
}
func (f *fanoutAppender) AppendWithCT(ref SeriesRef, l labels.Labels, t, ct int64, v float64) (SeriesRef, error) {
ref, err := f.primary.AppendWithCT(ref, l, t, ct, v)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendWithCT(ref, l, t, ct, v); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
ref, err := f.primary.AppendExemplar(ref, l, e)
if err != nil {
@ -199,6 +213,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
return ref, nil
}
func (f *fanoutAppender) AppendHistogramWithCT(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogramWithCT(ref, l, t, ct, h, fh)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendHistogramWithCT(ref, l, t, ct, h, fh); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
if err != nil {

View file

@ -265,6 +265,12 @@ type Appender interface {
// If the reference is 0 it must not be used for caching.
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
// AppendWithCT is like Append, but also stores an optional CT with to the sample.
// ct equal to 0 value means no CT.
// TODO(bwplotka): Consider adding CT (with 0 being empty) to Append once
// this mechanism is proven.
AppendWithCT(ref SeriesRef, l labels.Labels, t, ct int64, v float64) (SeriesRef, error)
// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
@ -319,13 +325,19 @@ type HistogramAppender interface {
// reference number is returned which can be used to add further
// histograms in the same or later transactions. Returned reference
// numbers are ephemeral and may be rejected in calls to Append() at any
// point. Adding the sample via Append() returns a new reference number.
// point. Adding the sample via AppendHistogram() returns a new reference number.
// If the reference is 0, it must not be used for caching.
//
// For efficiency reasons, the histogram is passed as a
// pointer. AppendHistogram won't mutate the histogram, but in turn
// depends on the caller to not mutate it either.
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
// AppendHistogramWithCT is like AppendHistogram, but also stores CT with to the sample.
// TODO(bwplotka): Consider adding CT (with 0 being empty) to AppendHistogram once
// this mechanism is proven.
AppendHistogramWithCT(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
// AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming
// sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be
@ -355,6 +367,8 @@ type MetadataUpdater interface {
}
// CreatedTimestampAppender provides an interface for appending CT to storage.
// TODO(bwplotka): Consider add histogram CT zero sample methods here for consistency.
// TODO(bwplotka): This might be removed at some point, superseded by https://github.com/prometheus/prometheus/issues/14218
type CreatedTimestampAppender interface {
// AppendCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming

View file

@ -730,11 +730,12 @@ outer:
default:
}
if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls,
metadata: meta,
timestamp: s.T,
value: s.V,
sType: tSample,
seriesLabels: lbls,
metadata: meta,
timestamp: s.T,
createdTimestamp: s.CT,
value: s.V,
sType: tSample,
}) {
continue outer
}
@ -1351,13 +1352,14 @@ type queue struct {
}
type timeSeries struct {
seriesLabels labels.Labels
value float64
histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram
metadata *metadata.Metadata
timestamp int64
exemplarLabels labels.Labels
seriesLabels labels.Labels
value float64
histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram
metadata *metadata.Metadata
timestamp int64
createdTimestamp int64
exemplarLabels labels.Labels
// The type of series: sample, exemplar, or histogram.
sType seriesType
}
@ -1949,6 +1951,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
Value: d.value,
Timestamp: d.timestamp,
})
pendingData[nPending].CreatedTimestamp = d.createdTimestamp
nPendingSamples++
case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{

View file

@ -294,6 +294,14 @@ func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64
return 0, nil
}
func (t *timestampTracker) AppendWithCT(_ storage.SeriesRef, _ labels.Labels, ts, _ int64, _ float64) (storage.SeriesRef, error) {
t.samples++
if ts > t.highestTimestamp {
t.highestTimestamp = ts
}
return 0, nil
}
func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) {
t.exemplars++
return 0, nil
@ -307,6 +315,14 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}
func (t *timestampTracker) AppendHistogramWithCT(_ storage.SeriesRef, _ labels.Labels, ts, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
t.histograms++
if ts > t.highestTimestamp {
t.highestTimestamp = ts
}
return 0, nil
}
func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, ct int64) (storage.SeriesRef, error) {
t.samples++
if ct > t.highestTimestamp {

View file

@ -673,11 +673,15 @@ type timeLimitAppender struct {
}
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return app.AppendWithCT(ref, lset, t, 0, v)
}
func (app *timeLimitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
ref, err := app.Appender.Append(ref, lset, t, v)
ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v)
if err != nil {
return 0, err
}
@ -685,11 +689,15 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
}
func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return app.AppendHistogramWithCT(ref, l, t, 0, h, fh)
}
func (app *timeLimitAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
ref, err := app.Appender.AppendHistogramWithCT(ref, l, t, ct, h, fh)
if err != nil {
return 0, err
}

View file

@ -271,7 +271,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
for _, ts := range writeRequestFixture.Timeseries {
labels := ts.ToLabels(&b, nil)
for _, s := range ts.Samples {
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
requireEqual(t, mockSample{labels, s.Timestamp, 0, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
@ -282,10 +282,10 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := hp.ToFloatHistogram()
requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
requireEqual(t, mockHistogram{labels, hp.Timestamp, 0, nil, fh}, appendable.histograms[k])
} else {
h := hp.ToIntHistogram()
requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
requireEqual(t, mockHistogram{labels, hp.Timestamp, 0, h, nil}, appendable.histograms[k])
}
k++
@ -500,27 +500,27 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
for _, s := range ts.Samples {
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i])
requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0, 0}, appendable.samples[i])
i++
}
requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
requireEqual(t, mockSample{ls, s.Timestamp, 0, s.Value}, appendable.samples[i])
i++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := hp.ToFloatHistogram()
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k])
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, 0, nil, &histogram.FloatHistogram{}}, appendable.histograms[k])
k++
}
requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
requireEqual(t, mockHistogram{ls, hp.Timestamp, 0, nil, fh}, appendable.histograms[k])
} else {
h := hp.ToIntHistogram()
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k])
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, 0, &histogram.Histogram{}, nil}, appendable.histograms[k])
k++
}
requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
requireEqual(t, mockHistogram{ls, hp.Timestamp, 0, h, nil}, appendable.histograms[k])
}
k++
}
@ -811,9 +811,9 @@ type mockAppendable struct {
}
type mockSample struct {
l labels.Labels
t int64
v float64
l labels.Labels
t, ct int64
v float64
}
type mockExemplar struct {
@ -824,10 +824,10 @@ type mockExemplar struct {
}
type mockHistogram struct {
l labels.Labels
t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
l labels.Labels
t, ct int64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
type mockMetadata struct {
@ -864,7 +864,11 @@ func (m *mockAppendable) SetOptions(_ *storage.AppendOptions) {
panic("unimplemented")
}
func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
func (m *mockAppendable) Append(r storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return m.AppendWithCT(r, l, t, 0, v)
}
func (m *mockAppendable) AppendWithCT(_ storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
if m.appendSampleErr != nil {
return 0, m.appendSampleErr
}
@ -885,7 +889,7 @@ func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
}
m.latestSample[l.Hash()] = t
m.samples = append(m.samples, mockSample{l, t, v})
m.samples = append(m.samples, mockSample{l, t, ct, v})
return 0, nil
}
@ -922,7 +926,11 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e
return 0, nil
}
func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
func (m *mockAppendable) AppendHistogram(r storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return m.AppendHistogramWithCT(r, l, t, 0, h, fh)
}
func (m *mockAppendable) AppendHistogramWithCT(_ storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if m.appendHistogramErr != nil {
return 0, m.appendHistogramErr
}
@ -952,7 +960,7 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
} else {
m.latestFloatHist[l.Hash()] = t
}
m.histograms = append(m.histograms, mockHistogram{l, t, h, fh})
m.histograms = append(m.histograms, mockHistogram{l, t, ct, h, fh})
return 0, nil
}
@ -989,10 +997,10 @@ func (m *mockAppendable) AppendHistogramCTZeroSample(_ storage.SeriesRef, l labe
if h != nil {
m.latestHistogram[l.Hash()] = ct
m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil})
m.histograms = append(m.histograms, mockHistogram{l, ct, 0, &histogram.Histogram{}, nil})
} else {
m.latestFloatHist[l.Hash()] = ct
m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}})
m.histograms = append(m.histograms, mockHistogram{l, ct, 0, nil, &histogram.FloatHistogram{}})
}
return 0, nil
}
@ -1032,6 +1040,6 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels
}
m.latestSample[l.Hash()] = ct
m.samples = append(m.samples, mockSample{l, ct, 0})
m.samples = append(m.samples, mockSample{l, ct, 0, 0})
return 0, nil
}

View file

@ -839,13 +839,21 @@ func (s syncAppendable) Appender(ctx context.Context) storage.Appender {
}
func (s syncAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return s.AppendWithCT(ref, l, t, 0, v)
}
func (s syncAppender) AppendWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.Appender.Append(ref, l, t, v)
return s.Appender.AppendWithCT(ref, l, t, ct, v)
}
func (s syncAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) {
return s.AppendHistogramWithCT(ref, l, t, 0, h, f)
}
func (s syncAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.Appender.AppendHistogram(ref, l, t, h, f)
return s.Appender.AppendHistogramWithCT(ref, l, t, ct, h, f)
}

View file

@ -222,7 +222,9 @@ func (m *dbMetrics) Unregister() {
}
}
// DB represents a WAL-only storage. It implements storage.DB.
var _ storage.Appendable = &DB{}
// DB represents a WAL-only storage.
type DB struct {
mtx sync.RWMutex
logger *slog.Logger
@ -452,7 +454,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- series
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples := db.walReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
@ -762,6 +764,8 @@ func (db *DB) Close() error {
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
}
var _ storage.Appender = &appender{}
type appender struct {
*DB
hints *storage.AppendOptions
@ -790,6 +794,10 @@ func (a *appender) SetOptions(opts *storage.AppendOptions) {
}
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return a.AppendWithCT(ref, l, t, 0, v)
}
func (a *appender) AppendWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
// series references and chunk references are identical for agent mode.
headRef := chunks.HeadSeriesRef(ref)
@ -826,11 +834,16 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
return 0, storage.ErrOutOfOrderSample
}
if ct != 0 && ct > t {
ct = 0
}
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref,
T: t,
V: v,
CT: ct,
})
a.sampleSeries = append(a.sampleSeries, series)
@ -905,6 +918,12 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
return storage.SeriesRef(s.ref), nil
}
func (a *appender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, _ int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records.
// We ignore CT for now.
return a.AppendHistogram(ref, lset, t, h, fh)
}
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if err := h.Validate(); err != nil {

View file

@ -224,7 +224,7 @@ func TestCommit(t *testing.T) {
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
case record.Samples, record.SamplesWithCT:
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
@ -357,7 +357,7 @@ func TestRollback(t *testing.T) {
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
case record.Samples, record.SamplesWithCT:
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
@ -1349,7 +1349,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
series, err := dec.Series(rec, nil)
require.NoError(t, err)
lastSeries = series[0]
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples, err = dec.Samples(rec, samples[:0])
require.NoError(t, err)
for _, s := range samples {

View file

@ -4497,7 +4497,7 @@ func testOOOWALWrite(t *testing.T,
series, err := dec.Series(rec, nil)
require.NoError(t, err)
records = append(records, series)
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples, err := dec.Samples(rec, nil)
require.NoError(t, err)
records = append(records, samples)

View file

@ -56,6 +56,16 @@ func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
return a.app.Append(ref, lset, t, v)
}
func (a *initAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendWithCT(ref, lset, t, ct, v)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendWithCT(ref, lset, t, ct, v)
}
func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// Check if exemplar storage is enabled.
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
@ -77,19 +87,29 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
if a.app != nil {
return a.app.AppendHistogram(ref, l, t, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogram(ref, l, t, h, fh)
}
func (a *initAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendHistogramWithCT(ref, l, t, ct, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogramWithCT(ref, l, t, ct, h, fh)
}
func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
@ -109,7 +129,6 @@ func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendCTZeroSample(ref, lset, t, ct)
}
@ -340,6 +359,10 @@ func (a *headAppender) SetOptions(opts *storage.AppendOptions) {
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return a.AppendWithCT(ref, lset, t, 0, v)
}
func (a *headAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) {
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if a.oooTimeWindow == 0 && t < a.minValidTime {
@ -401,11 +424,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
if t > a.maxt {
a.maxt = t
}
if ct != 0 && ct > t {
// TODO(bwplotka): Invalid (in future) CTs, ignore it. Add metric to report this event.
// WARN FOR PR experiment only
slog.Warn("got CT in future?", "ct", ct, "t", t, "lset", lset.String())
ct = 0
}
a.samples = append(a.samples, record.RefSample{
Ref: s.ref,
T: t,
V: v,
CT: ct,
})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
@ -413,7 +442,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
// storage.WithCTAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, storage.ErrCTNewerThanSample
@ -447,7 +476,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0, CT: ct})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}
@ -647,6 +676,12 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records.
// We ignore CT for now.
return a.AppendHistogram(ref, lset, t, h, fh)
}
func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if !a.head.opts.EnableNativeHistograms.Load() {
return 0, storage.ErrNativeHistogramsDisabled

View file

@ -179,12 +179,12 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
switch typ := dec.Type(rec); typ {
case record.Series:
series, err := dec.Series(rec, nil)
require.NoError(t, err)
recs = append(recs, series)
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples, err := dec.Samples(rec, nil)
require.NoError(t, err)
recs = append(recs, samples)
@ -209,7 +209,7 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
require.NoError(t, err)
recs = append(recs, exemplars)
default:
require.Fail(t, "unknown record type")
require.Fail(t, "unknown record type", typ)
}
}
require.NoError(t, r.Err())
@ -6295,15 +6295,14 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing
require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0))
}
func TestHeadAppender_AppendCT(t *testing.T) {
func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
type appendableSamples struct {
ts int64
ts, ct int64
fSample float64
h *histogram.Histogram
fh *histogram.FloatHistogram
ct int64
}
for _, tc := range []struct {
name string
@ -6516,6 +6515,82 @@ func TestHeadAppender_AppendCT(t *testing.T) {
}
}
func TestHeadAppender_AppendWithCT(t *testing.T) {
type appendableSamples struct {
t, ct int64
v float64
}
for _, tc := range []struct {
name string
appendableSamples []appendableSamples
expectedSamples []record.RefSample
}{
{
name: "sample with non-zero ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
},
},
{
name: "sample with ct > t are ignored",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 101},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, Ref: 1},
},
},
{
name: "multiple samples + same ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
{t: 101, v: 10, ct: 1},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
{T: 101, V: 10, CT: 1, Ref: 1},
},
},
{
name: "multiple samples + different ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
{t: 102, v: 10, ct: 101},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
{T: 102, V: 10, CT: 101, Ref: 1},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := a.AppendWithCT(0, lbls, sample.t, sample.ct, sample.v)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
recs := readTestWAL(t, w.Dir())
_, ok := recs[0].([]record.RefSeries)
require.True(t, ok, "expected first record to be a RefSeries")
actualType := reflect.TypeOf(recs[1])
require.Equal(t, reflect.TypeOf([]record.RefSample{}), actualType, "expected second record to be a record.RefSample")
require.Equal(t, tc.expectedSamples, recs[1])
})
}
}
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// Use a chunk range of 1 here so that if we attempted to determine if the head
// was compactable using default values for min and max times, `Head.compactable()`

View file

@ -142,7 +142,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
return
}
decoded <- series
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
@ -686,7 +686,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
var err error
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {

View file

@ -56,6 +56,8 @@ const (
CustomBucketsHistogramSamples Type = 9
// CustomBucketsFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets.
CustomBucketsFloatHistogramSamples Type = 10
// SamplesWithCT is an enhanced sample record that allows storing an optional CT per sample.
SamplesWithCT Type = 11
)
func (rt Type) String() string {
@ -64,6 +66,8 @@ func (rt Type) String() string {
return "series"
case Samples:
return "samples"
case SamplesWithCT:
return "samples-with-ct"
case Tombstones:
return "tombstones"
case Exemplars:
@ -155,12 +159,12 @@ type RefSeries struct {
Labels labels.Labels
}
// RefSample is a timestamp/value pair associated with a reference to a series.
// RefSample is a timestamp/ct/value struct associated with a reference to a series.
// TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample.
type RefSample struct {
Ref chunks.HeadSeriesRef
T int64
V float64
Ref chunks.HeadSeriesRef
T, CT int64
V float64
}
// RefMetadata is the metadata associated with a series ID.
@ -180,6 +184,7 @@ type RefExemplar struct {
}
// RefHistogramSample is a histogram.
// TODO(bwplotka): Add support for CT.
type RefHistogramSample struct {
Ref chunks.HeadSeriesRef
T int64
@ -187,6 +192,7 @@ type RefHistogramSample struct {
}
// RefFloatHistogramSample is a float histogram.
// TODO(bwplotka): Add support for CT.
type RefFloatHistogramSample struct {
Ref chunks.HeadSeriesRef
T int64
@ -215,7 +221,9 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketsHistogramSamples, CustomBucketsFloatHistogramSamples:
case Series, Samples, SamplesWithCT, Tombstones, Exemplars,
MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples,
CustomBucketsHistogramSamples, CustomBucketsFloatHistogramSamples:
return t
}
return Unknown
@ -304,10 +312,17 @@ func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels {
// Samples appends samples in rec to the given slice.
func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := encoding.Decbuf{B: rec}
if Type(dec.Byte()) != Samples {
return nil, errors.New("invalid record type")
switch typ := dec.Byte(); Type(typ) {
case Samples:
return d.samples(&dec, samples)
case SamplesWithCT:
return d.samplesWithCT(&dec, samples)
default:
return nil, fmt.Errorf("invalid record type %v, expected Samples(2) or SamplesWithCT(11)", typ)
}
}
func (d *Decoder) samples(dec *encoding.Decbuf, samples []RefSample) ([]RefSample, error) {
if dec.Len() == 0 {
return samples, nil
}
@ -340,6 +355,42 @@ func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error)
return samples, nil
}
func (d *Decoder) samplesWithCT(dec *encoding.Decbuf, samples []RefSample) ([]RefSample, error) {
if dec.Len() == 0 {
return samples, nil
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
baseCT = dec.Be64int64()
)
// Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big.
if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize {
samples = make([]RefSample, 0, minSize)
}
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
dCT := dec.Varint64()
val := dec.Be64()
samples = append(samples, RefSample{
Ref: chunks.HeadSeriesRef(int64(baseRef) + dref),
T: baseTime + dtime,
CT: baseCT + dCT,
V: math.Float64frombits(val),
})
}
if dec.Err() != nil {
return nil, fmt.Errorf("decode error after %d samples: %w", len(samples), dec.Err())
}
if len(dec.B) > 0 {
return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return samples, nil
}
// Tombstones appends tombstones in rec to the given slice.
func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombstones.Stone, error) {
dec := encoding.Decbuf{B: rec}
@ -665,7 +716,17 @@ func EncodeLabels(buf *encoding.Encbuf, lbls labels.Labels) {
}
// Samples appends the encoded samples to b and returns the resulting slice.
// Depending on the CT existence it either writes Samples or SamplesWithCT record.
func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
for _, s := range samples {
if s.CT != 0 {
return e.samplesWithCT(samples, b)
}
}
return e.samples(samples, b)
}
func (e *Encoder) samples(samples []RefSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(Samples))
@ -688,6 +749,32 @@ func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
return buf.Get()
}
func (e *Encoder) samplesWithCT(samples []RefSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(SamplesWithCT))
if len(samples) == 0 {
return buf.Get()
}
// Store base timestamp, base CT and base reference number of first sample.
// All samples encode their timestamp, CT and ref as delta to those.
// TODO(ridwanmsharif): Should the timestamp be encoded as a delta with the CT?
first := samples[0]
buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T)
buf.PutBE64int64(first.CT)
for _, s := range samples {
buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
buf.PutVarint64(s.T - first.T)
buf.PutVarint64(s.CT - first.CT)
buf.PutBE64(math.Float64bits(s.V))
}
return buf.Get()
}
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte {
buf := encoding.Encbuf{B: b}

View file

@ -84,6 +84,15 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, samples, decSamples)
samplesWithCT := []RefSample{
{Ref: 0, T: 12423423, CT: 14, V: 1.2345},
{Ref: 123, T: -1231, CT: 14, V: -123},
{Ref: 2, T: 0, CT: 14, V: 99999},
}
decSamplesWithCT, err := dec.Samples(enc.Samples(samplesWithCT, nil), nil)
require.NoError(t, err)
require.Equal(t, samplesWithCT, decSamplesWithCT)
// Intervals get split up into single entries. So we don't get back exactly
// what we put in.
tstones := []tombstones.Stone{

View file

@ -191,7 +191,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, fmt.Errorf("decode samples: %w", err)

View file

@ -323,7 +323,7 @@ func TestCheckpoint(t *testing.T) {
case record.Series:
series, err = dec.Series(rec, series)
require.NoError(t, err)
case record.Samples:
case record.Samples, record.SamplesWithCT:
samples, err := dec.Samples(rec, nil)
require.NoError(t, err)
for _, s := range samples {

View file

@ -503,7 +503,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
}
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
case record.Samples, record.SamplesWithCT:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {