From aee78bdb319fb8c99d294bfe1ddec00bac99b511 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Thu, 24 Oct 2024 18:19:20 +0000 Subject: [PATCH] ct: Support CTs in WAL; change sample record; use in PRW 2.0 Fixes https://github.com/prometheus/prometheus/issues/14218 and https://github.com/prometheus/prometheus/issues/14220 Rebased version of https://github.com/prometheus/prometheus/pull/15254 with improvements. This change does the following: - Change appender interface to be CT aware (optional CT) - Add created-timestamp-per-sample feature flag - Add new sample record used only if CT is appended with the sample. - Remote Write awareness of CT. Signed-off-by: Ridwan Sharif Signed-off-by: bwplotka # Conflicts: # cmd/prometheus/main.go # scrape/helpers_test.go # storage/remote/write_handler_test.go --- cmd/prometheus/main.go | 21 ++++++ docs/feature_flags.md | 43 +++++++++-- scrape/helpers_test.go | 29 ++++++-- scrape/manager.go | 16 +++-- scrape/scrape.go | 18 +++-- scrape/scrape_test.go | 9 ++- scrape/target.go | 24 +++++-- storage/fanout.go | 28 ++++++++ storage/interface.go | 16 ++++- storage/remote/queue_manager.go | 27 +++---- storage/remote/write.go | 16 +++++ storage/remote/write_handler.go | 12 +++- storage/remote/write_handler_test.go | 54 ++++++++------ storage/remote/write_test.go | 12 +++- tsdb/agent/db.go | 23 +++++- tsdb/agent/db_test.go | 6 +- tsdb/db_test.go | 2 +- tsdb/head_append.go | 47 ++++++++++-- tsdb/head_test.go | 87 ++++++++++++++++++++-- tsdb/head_wal.go | 4 +- tsdb/record/record.go | 103 ++++++++++++++++++++++++--- tsdb/record/record_test.go | 9 +++ tsdb/wlog/checkpoint.go | 2 +- tsdb/wlog/checkpoint_test.go | 2 +- tsdb/wlog/watcher.go | 2 +- 25 files changed, 517 insertions(+), 95 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4559d51837..d9f7c51a50 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -257,9 +257,22 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "ooo-native-histograms": c.tsdb.EnableOOONativeHistograms = true logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true") + case "created-timestamp-per-sample": + c.scrape.EnableCreatedTimestampPerSample = true + // TODO(bwplotka): Add support for CT per sample in: + // * Native histogram WAL records. + // * PRW and OTLP receiving + // * PromQL engine (accessing from WAL) + // * TSDB storage + + // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. + config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + logger.Info("Experimental created timestamp per sample enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "created-timestamp-zero-ingestion": c.scrape.EnableCreatedTimestampZeroIngestion = true c.web.CTZeroIngestionEnabled = true + // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols @@ -1644,6 +1657,10 @@ func (n notReadyAppender) Append(_ storage.SeriesRef, _ labels.Labels, _ int64, return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendWithCT(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ float64) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } @@ -1652,6 +1669,10 @@ func (n notReadyAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendHistogramWithCT(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 6973d6d73b..00a7578020 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -79,15 +79,50 @@ Enables PromQL functions that are considered experimental. These functions might change their name, syntax, or semantics. They might also get removed entirely. +## Created Timestamps per sample + +`--enable-feature=created-timestamp-per-sample` + +When enabled, Prometheus will store created timestamps (CT) per sample, which +allows persisting CTs across restarts in the WAL, block storage, +as well as using it in Remote Write 2.0 and PromQL engine. + +This must be used if you would like to send created timestamps using the new remote write 2.0. + +Important: Enhanced sample records are only readable by v3.3+ Prometheus versions. + +### Current State + +This flag intends to cover all experimental features related to created timestamps +per sample approach. + +At the moment, only WAL storage is implemented. CT is stored per sample in a +new samples WAL record. CTs are used for Remote Write 2.0 endpoints if configured. + +In the future, we intend to enable Prometheus PromQL to use CTs in sample WAL records, +as well as storing it in TSDB block storage. + +Enable the `created-timestamp-zero-ingestion` explained below, if you wish to have PromQL +aware of CTs in a form of synthetic zero samples, with all its consequences. + +### Client Considerations + +Currently, Prometheus supports created timestamps only on the traditional +Prometheus Protobuf protocol. As a result, when enabling +this feature, the Prometheus protobuf scrape protocol will be prioritized +(See `scrape_config.scrape_protocols` settings for more details). Community is +working on [accessible CTs through OpenMetrics 2.0 too](). + +Besides enabling this feature in Prometheus, CT need to be exposed by the application being scraped. + ## Created Timestamps Zero Injection `--enable-feature=created-timestamp-zero-ingestion` -Enables ingestion of created timestamp. Created timestamps are injected as 0 valued samples when appropriate. See [PromCon talk](https://youtu.be/nWf0BfQ5EEA) for details. +Enables ingestion of created timestamp (CT). Created timestamps are injected as +0 valued samples when appropriate. See [PromCon talk](https://youtu.be/nWf0BfQ5EEA) for details. -Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details). - -Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped. +The [client considerations for CTs](#client-considerations) also applies here. ## Concurrent evaluation of independent rules diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index fcef695385..b45198a31b 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -49,6 +49,10 @@ func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (s return 1, nil } +func (a nopAppender) AppendWithCT(storage.SeriesRef, labels.Labels, int64, int64, float64) (storage.SeriesRef, error) { + return 6, nil +} + func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) { return 2, nil } @@ -57,7 +61,11 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h return 3, nil } -func (a nopAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (a nopAppender) AppendHistogramWithCT(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 7, nil +} + +func (a nopAppender) AppendHistogramCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { return 0, nil } @@ -74,7 +82,7 @@ func (a nopAppender) Rollback() error { return nil } type floatSample struct { metric labels.Labels - t int64 + ct, t int64 f float64 } @@ -85,7 +93,7 @@ func equalFloatSamples(a, b floatSample) bool { type histogramSample struct { metric labels.Labels - t int64 + ct, t int64 h *histogram.Histogram fh *histogram.FloatHistogram } @@ -140,10 +148,15 @@ type collectResultAppender struct { func (a *collectResultAppender) SetOptions(_ *storage.AppendOptions) {} func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return a.AppendWithCT(ref, lset, t, 0, v) +} + +func (a *collectResultAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() a.pendingFloats = append(a.pendingFloats, floatSample{ metric: lset, + ct: ct, t: t, f: v, }) @@ -155,7 +168,7 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels return ref, nil } - ref, err := a.next.Append(ref, lset, t, v) + ref, err := a.next.AppendWithCT(ref, lset, t, ct, v) if err != nil { return 0, err } @@ -174,14 +187,18 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L } func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return a.AppendHistogramWithCT(ref, l, t, 0, h, fh) +} + +func (a *collectResultAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() - a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l}) + a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, ct: ct, metric: l}) if a.next == nil { return 0, nil } - return a.next.AppendHistogram(ref, l, t, h, fh) + return a.next.AppendHistogramWithCT(ref, l, t, ct, h, fh) } func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, _, ct int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { diff --git a/scrape/manager.go b/scrape/manager.go index 5ef5dccb99..e4ef559250 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -81,17 +81,23 @@ type Options struct { AppendMetadata bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration - // Option to enable the ingestion of the created timestamp as a synthetic zero sample. - // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md - EnableCreatedTimestampZeroIngestion bool - // Option to enable the ingestion of native histograms. - EnableNativeHistogramsIngestion bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption // private option for testability. skipOffsetting bool + + // ---Feature flags: + + // EnableNativeHistogramsIngestion enables the ingestion of native histograms. + EnableNativeHistogramsIngestion bool + // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. + // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md + EnableCreatedTimestampZeroIngestion bool + // EnableCreatedTimestampPerSample enables the ingestion of the created timestamp per sample, + // through the TSDB AppendWithCT and AppendHistogramWithCT methods. + EnableCreatedTimestampPerSample bool } // Manager maintains a set of scrape pools and manages start/stop cycles diff --git a/scrape/scrape.go b/scrape/scrape.go index 2cd3d78d53..0e51d04a0a 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -195,6 +195,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.convertClassicHistToNHCB, options.EnableNativeHistogramsIngestion, options.EnableCreatedTimestampZeroIngestion, + options.EnableCreatedTimestampPerSample, options.ExtraMetrics, options.AppendMetadata, opts.target, @@ -916,6 +917,7 @@ type scrapeLoop struct { // Feature flagged options. enableNativeHistogramIngestion bool enableCTZeroIngestion bool + enableCTPerSample bool appender func(ctx context.Context) storage.Appender symbolTable *labels.SymbolTable @@ -1223,6 +1225,7 @@ func newScrapeLoop(ctx context.Context, convertClassicHistToNHCB bool, enableNativeHistogramIngestion bool, enableCTZeroIngestion bool, + enableCTPerSample bool, reportExtraMetrics bool, appendMetadataToWAL bool, target *Target, @@ -1279,6 +1282,7 @@ func newScrapeLoop(ctx context.Context, convertClassicHistToNHCB: convertClassicHistToNHCB, enableNativeHistogramIngestion: enableNativeHistogramIngestion, enableCTZeroIngestion: enableCTZeroIngestion, + enableCTPerSample: enableCTPerSample, reportExtraMetrics: reportExtraMetrics, appendMetadataToWAL: appendMetadataToWAL, metrics: metrics, @@ -1746,6 +1750,12 @@ loop: if seriesAlreadyScraped && parsedTimestamp == nil { err = storage.ErrDuplicateSampleForTimestamp } else { + var ct int64 + if sl.enableCTPerSample { + if ctMs := p.CreatedTimestamp(); ctMs != nil { + ct = *ctMs + } + } if sl.enableCTZeroIngestion { if ctMs := p.CreatedTimestamp(); ctMs != nil { if isHistogram && sl.enableNativeHistogramIngestion { @@ -1760,19 +1770,19 @@ loop: if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. // CT is an experimental feature. For now, we don't need to fail the // scrape on errors updating the created timestamp, log debug. - sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", ct, "t", t, "err", err) } } } if isHistogram && sl.enableNativeHistogramIngestion { if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h, nil) + ref, err = app.AppendHistogramWithCT(ref, lset, t, ct, h, nil) } else { - ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + ref, err = app.AppendHistogramWithCT(ref, lset, t, ct, nil, fh) } } else { - ref, err = app.Append(ref, lset, t, val) + ref, err = app.AppendWithCT(ref, lset, t, ct, val) } } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 6bb140cca8..dfe499b246 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -958,6 +958,7 @@ func newBasicScrapeLoopWithFallback(t testing.TB, ctx context.Context, scraper s false, false, false, + false, true, nil, false, @@ -1105,6 +1106,7 @@ func TestScrapeLoopRun(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -1252,6 +1254,7 @@ func TestScrapeLoopMetadata(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -2815,6 +2818,10 @@ type errorAppender struct { } func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return app.AppendWithCT(ref, lset, t, 0, v) +} + +func (app *errorAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { switch lset.Get(model.MetricNameLabel) { case "out_of_order": return 0, storage.ErrOutOfOrderSample @@ -2823,7 +2830,7 @@ func (app *errorAppender) Append(ref storage.SeriesRef, lset labels.Labels, t in case "out_of_bounds": return 0, storage.ErrOutOfBounds default: - return app.collectResultAppender.Append(ref, lset, t, v) + return app.collectResultAppender.AppendWithCT(ref, lset, t, ct, v) } } diff --git a/scrape/target.go b/scrape/target.go index 4f576504f0..a9afb5c03b 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -332,13 +332,17 @@ type limitAppender struct { } func (app *limitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return app.AppendWithCT(ref, lset, t, 0, v) +} + +func (app *limitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { if !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return 0, errSampleLimit } } - ref, err := app.Appender.Append(ref, lset, t, v) + ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v) if err != nil { return 0, err } @@ -352,11 +356,15 @@ type timeLimitAppender struct { } func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return app.AppendWithCT(ref, lset, t, 0, v) +} + +func (app *timeLimitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { if t > app.maxTime { return 0, storage.ErrOutOfBounds } - ref, err := app.Appender.Append(ref, lset, t, v) + ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v) if err != nil { return 0, err } @@ -371,6 +379,10 @@ type bucketLimitAppender struct { } func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return app.AppendHistogramWithCT(ref, lset, t, 0, h, fh) +} + +func (app *bucketLimitAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { // Return with an early error if the histogram has too many buckets and the // schema is not exponential, in which case we can't reduce the resolution. @@ -397,7 +409,7 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe fh = fh.ReduceResolution(fh.Schema - 1) } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) + ref, err := app.Appender.AppendHistogramWithCT(ref, lset, t, ct, h, fh) if err != nil { return 0, err } @@ -411,6 +423,10 @@ type maxSchemaAppender struct { } func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return app.AppendHistogramWithCT(ref, lset, t, 0, h, fh) +} + +func (app *maxSchemaAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema { h = h.ReduceResolution(app.maxSchema) @@ -421,7 +437,7 @@ func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels fh = fh.ReduceResolution(app.maxSchema) } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) + ref, err := app.Appender.AppendHistogramWithCT(ref, lset, t, ct, h, fh) if err != nil { return 0, err } diff --git a/storage/fanout.go b/storage/fanout.go index 4d076788a7..aa7e5395d0 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -171,6 +171,20 @@ func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float return ref, nil } +func (f *fanoutAppender) AppendWithCT(ref SeriesRef, l labels.Labels, t, ct int64, v float64) (SeriesRef, error) { + ref, err := f.primary.AppendWithCT(ref, l, t, ct, v) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendWithCT(ref, l, t, ct, v); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) { ref, err := f.primary.AppendExemplar(ref, l, e) if err != nil { @@ -199,6 +213,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64 return ref, nil } +func (f *fanoutAppender) AppendHistogramWithCT(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { + ref, err := f.primary.AppendHistogramWithCT(ref, l, t, ct, h, fh) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendHistogramWithCT(ref, l, t, ct, h, fh); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) if err != nil { diff --git a/storage/interface.go b/storage/interface.go index 32b90cc10a..0b01fbe568 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -265,6 +265,12 @@ type Appender interface { // If the reference is 0 it must not be used for caching. Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) + // AppendWithCT is like Append, but also stores an optional CT with to the sample. + // ct equal to 0 value means no CT. + // TODO(bwplotka): Consider adding CT (with 0 being empty) to Append once + // this mechanism is proven. + AppendWithCT(ref SeriesRef, l labels.Labels, t, ct int64, v float64) (SeriesRef, error) + // Commit submits the collected samples and purges the batch. If Commit // returns a non-nil error, it also rolls back all modifications made in // the appender so far, as Rollback would do. In any case, an Appender @@ -319,13 +325,19 @@ type HistogramAppender interface { // reference number is returned which can be used to add further // histograms in the same or later transactions. Returned reference // numbers are ephemeral and may be rejected in calls to Append() at any - // point. Adding the sample via Append() returns a new reference number. + // point. Adding the sample via AppendHistogram() returns a new reference number. // If the reference is 0, it must not be used for caching. // // For efficiency reasons, the histogram is passed as a // pointer. AppendHistogram won't mutate the histogram, but in turn // depends on the caller to not mutate it either. AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) + + // AppendHistogramWithCT is like AppendHistogram, but also stores CT with to the sample. + // TODO(bwplotka): Consider adding CT (with 0 being empty) to AppendHistogram once + // this mechanism is proven. + AppendHistogramWithCT(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) + // AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp, // which will be associated with given series, labels and the incoming // sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be @@ -355,6 +367,8 @@ type MetadataUpdater interface { } // CreatedTimestampAppender provides an interface for appending CT to storage. +// TODO(bwplotka): Consider add histogram CT zero sample methods here for consistency. +// TODO(bwplotka): This might be removed at some point, superseded by https://github.com/prometheus/prometheus/issues/14218 type CreatedTimestampAppender interface { // AppendCTZeroSample adds synthetic zero sample for the given ct timestamp, // which will be associated with given series, labels and the incoming diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b274707bff..ec89f20499 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -730,11 +730,12 @@ outer: default: } if t.shards.enqueue(s.Ref, timeSeries{ - seriesLabels: lbls, - metadata: meta, - timestamp: s.T, - value: s.V, - sType: tSample, + seriesLabels: lbls, + metadata: meta, + timestamp: s.T, + createdTimestamp: s.CT, + value: s.V, + sType: tSample, }) { continue outer } @@ -1351,13 +1352,14 @@ type queue struct { } type timeSeries struct { - seriesLabels labels.Labels - value float64 - histogram *histogram.Histogram - floatHistogram *histogram.FloatHistogram - metadata *metadata.Metadata - timestamp int64 - exemplarLabels labels.Labels + seriesLabels labels.Labels + value float64 + histogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram + metadata *metadata.Metadata + timestamp int64 + createdTimestamp int64 + exemplarLabels labels.Labels // The type of series: sample, exemplar, or histogram. sType seriesType } @@ -1949,6 +1951,7 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, Value: d.value, Timestamp: d.timestamp, }) + pendingData[nPending].CreatedTimestamp = d.createdTimestamp nPendingSamples++ case tExemplar: pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{ diff --git a/storage/remote/write.go b/storage/remote/write.go index 51daeedb72..80c6268e4e 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -294,6 +294,14 @@ func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64 return 0, nil } +func (t *timestampTracker) AppendWithCT(_ storage.SeriesRef, _ labels.Labels, ts, _ int64, _ float64) (storage.SeriesRef, error) { + t.samples++ + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + return 0, nil +} + func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { t.exemplars++ return 0, nil @@ -307,6 +315,14 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, return 0, nil } +func (t *timestampTracker) AppendHistogramWithCT(_ storage.SeriesRef, _ labels.Labels, ts, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + t.histograms++ + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + return 0, nil +} + func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, ct int64) (storage.SeriesRef, error) { t.samples++ if ct > t.highestTimestamp { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 21e3693e50..da2d96c22b 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -673,11 +673,15 @@ type timeLimitAppender struct { } func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return app.AppendWithCT(ref, lset, t, 0, v) +} + +func (app *timeLimitAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } - ref, err := app.Appender.Append(ref, lset, t, v) + ref, err := app.Appender.AppendWithCT(ref, lset, t, ct, v) if err != nil { return 0, err } @@ -685,11 +689,15 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, } func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return app.AppendHistogramWithCT(ref, l, t, 0, h, fh) +} + +func (app *timeLimitAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } - ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) + ref, err := app.Appender.AppendHistogramWithCT(ref, l, t, ct, h, fh) if err != nil { return 0, err } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5bf6f5632e..9e8b8ab835 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -271,7 +271,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { for _, ts := range writeRequestFixture.Timeseries { labels := ts.ToLabels(&b, nil) for _, s := range ts.Samples { - requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + requireEqual(t, mockSample{labels, s.Timestamp, 0, s.Value}, appendable.samples[i]) i++ } for _, e := range ts.Exemplars { @@ -282,10 +282,10 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := hp.ToFloatHistogram() - requireEqual(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) + requireEqual(t, mockHistogram{labels, hp.Timestamp, 0, nil, fh}, appendable.histograms[k]) } else { h := hp.ToIntHistogram() - requireEqual(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) + requireEqual(t, mockHistogram{labels, hp.Timestamp, 0, h, nil}, appendable.histograms[k]) } k++ @@ -500,27 +500,27 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { for _, s := range ts.Samples { if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { - requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i]) + requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0, 0}, appendable.samples[i]) i++ } - requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) + requireEqual(t, mockSample{ls, s.Timestamp, 0, s.Value}, appendable.samples[i]) i++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := hp.ToFloatHistogram() if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { - requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k]) + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, 0, nil, &histogram.FloatHistogram{}}, appendable.histograms[k]) k++ } - requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) + requireEqual(t, mockHistogram{ls, hp.Timestamp, 0, nil, fh}, appendable.histograms[k]) } else { h := hp.ToIntHistogram() if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { - requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k]) + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, 0, &histogram.Histogram{}, nil}, appendable.histograms[k]) k++ } - requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) + requireEqual(t, mockHistogram{ls, hp.Timestamp, 0, h, nil}, appendable.histograms[k]) } k++ } @@ -811,9 +811,9 @@ type mockAppendable struct { } type mockSample struct { - l labels.Labels - t int64 - v float64 + l labels.Labels + t, ct int64 + v float64 } type mockExemplar struct { @@ -824,10 +824,10 @@ type mockExemplar struct { } type mockHistogram struct { - l labels.Labels - t int64 - h *histogram.Histogram - fh *histogram.FloatHistogram + l labels.Labels + t, ct int64 + h *histogram.Histogram + fh *histogram.FloatHistogram } type mockMetadata struct { @@ -864,7 +864,11 @@ func (m *mockAppendable) SetOptions(_ *storage.AppendOptions) { panic("unimplemented") } -func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { +func (m *mockAppendable) Append(r storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return m.AppendWithCT(r, l, t, 0, v) +} + +func (m *mockAppendable) AppendWithCT(_ storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { if m.appendSampleErr != nil { return 0, m.appendSampleErr } @@ -885,7 +889,7 @@ func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v } m.latestSample[l.Hash()] = t - m.samples = append(m.samples, mockSample{l, t, v}) + m.samples = append(m.samples, mockSample{l, t, ct, v}) return 0, nil } @@ -922,7 +926,11 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e return 0, nil } -func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (m *mockAppendable) AppendHistogram(r storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return m.AppendHistogramWithCT(r, l, t, 0, h, fh) +} + +func (m *mockAppendable) AppendHistogramWithCT(_ storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if m.appendHistogramErr != nil { return 0, m.appendHistogramErr } @@ -952,7 +960,7 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t } else { m.latestFloatHist[l.Hash()] = t } - m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) + m.histograms = append(m.histograms, mockHistogram{l, t, ct, h, fh}) return 0, nil } @@ -989,10 +997,10 @@ func (m *mockAppendable) AppendHistogramCTZeroSample(_ storage.SeriesRef, l labe if h != nil { m.latestHistogram[l.Hash()] = ct - m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil}) + m.histograms = append(m.histograms, mockHistogram{l, ct, 0, &histogram.Histogram{}, nil}) } else { m.latestFloatHist[l.Hash()] = ct - m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}}) + m.histograms = append(m.histograms, mockHistogram{l, ct, 0, nil, &histogram.FloatHistogram{}}) } return 0, nil } @@ -1032,6 +1040,6 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels } m.latestSample[l.Hash()] = ct - m.samples = append(m.samples, mockSample{l, ct, 0}) + m.samples = append(m.samples, mockSample{l, ct, 0, 0}) return 0, nil } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index a3b30b6425..042d2c03ad 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -839,13 +839,21 @@ func (s syncAppendable) Appender(ctx context.Context) storage.Appender { } func (s syncAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return s.AppendWithCT(ref, l, t, 0, v) +} + +func (s syncAppender) AppendWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { s.lock.Lock() defer s.lock.Unlock() - return s.Appender.Append(ref, l, t, v) + return s.Appender.AppendWithCT(ref, l, t, ct, v) } func (s syncAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) { + return s.AppendHistogramWithCT(ref, l, t, 0, h, f) +} + +func (s syncAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, f *histogram.FloatHistogram) (storage.SeriesRef, error) { s.lock.Lock() defer s.lock.Unlock() - return s.Appender.AppendHistogram(ref, l, t, h, f) + return s.Appender.AppendHistogramWithCT(ref, l, t, ct, h, f) } diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index cf4a977288..825c2ec454 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -222,7 +222,9 @@ func (m *dbMetrics) Unregister() { } } -// DB represents a WAL-only storage. It implements storage.DB. +var _ storage.Appendable = &DB{} + +// DB represents a WAL-only storage. type DB struct { mtx sync.RWMutex logger *slog.Logger @@ -452,7 +454,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- series - case record.Samples: + case record.Samples, record.SamplesWithCT: samples := db.walReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { @@ -762,6 +764,8 @@ func (db *DB) Close() error { return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err() } +var _ storage.Appender = &appender{} + type appender struct { *DB hints *storage.AppendOptions @@ -790,6 +794,10 @@ func (a *appender) SetOptions(opts *storage.AppendOptions) { } func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return a.AppendWithCT(ref, l, t, 0, v) +} + +func (a *appender) AppendWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { // series references and chunk references are identical for agent mode. headRef := chunks.HeadSeriesRef(ref) @@ -826,11 +834,16 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo return 0, storage.ErrOutOfOrderSample } + if ct != 0 && ct > t { + ct = 0 + } + // NOTE: always modify pendingSamples and sampleSeries together. a.pendingSamples = append(a.pendingSamples, record.RefSample{ Ref: series.ref, T: t, V: v, + CT: ct, }) a.sampleSeries = append(a.sampleSeries, series) @@ -905,6 +918,12 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem return storage.SeriesRef(s.ref), nil } +func (a *appender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, _ int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + // TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records. + // We ignore CT for now. + return a.AppendHistogram(ref, lset, t, h, fh) +} + func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if h != nil { if err := h.Validate(); err != nil { diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index db98e87408..b79fcbf711 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -224,7 +224,7 @@ func TestCommit(t *testing.T) { require.NoError(t, err) walSeriesCount += len(series) - case record.Samples: + case record.Samples, record.SamplesWithCT: var samples []record.RefSample samples, err = dec.Samples(rec, samples) require.NoError(t, err) @@ -357,7 +357,7 @@ func TestRollback(t *testing.T) { require.NoError(t, err) walSeriesCount += len(series) - case record.Samples: + case record.Samples, record.SamplesWithCT: var samples []record.RefSample samples, err = dec.Samples(rec, samples) require.NoError(t, err) @@ -1349,7 +1349,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample { series, err := dec.Series(rec, nil) require.NoError(t, err) lastSeries = series[0] - case record.Samples: + case record.Samples, record.SamplesWithCT: samples, err = dec.Samples(rec, samples[:0]) require.NoError(t, err) for _, s := range samples { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index ab2ece9e3b..2b320b68c9 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4497,7 +4497,7 @@ func testOOOWALWrite(t *testing.T, series, err := dec.Series(rec, nil) require.NoError(t, err) records = append(records, series) - case record.Samples: + case record.Samples, record.SamplesWithCT: samples, err := dec.Samples(rec, nil) require.NoError(t, err) records = append(records, samples) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c94c42bc53..4f611bd7ad 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -56,6 +56,16 @@ func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 return a.app.Append(ref, lset, t, v) } +func (a *initAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { + if a.app != nil { + return a.app.AppendWithCT(ref, lset, t, ct, v) + } + + a.head.initTime(t) + a.app = a.head.appender() + return a.app.AppendWithCT(ref, lset, t, ct, v) +} + func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { // Check if exemplar storage is enabled. if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { @@ -77,19 +87,29 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t if a.app != nil { return a.app.AppendHistogram(ref, l, t, h, fh) } + a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendHistogram(ref, l, t, h, fh) } +func (a *initAppender) AppendHistogramWithCT(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if a.app != nil { + return a.app.AppendHistogramWithCT(ref, l, t, ct, h, fh) + } + + a.head.initTime(t) + a.app = a.head.appender() + return a.app.AppendHistogramWithCT(ref, l, t, ct, h, fh) +} + func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if a.app != nil { return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) } + a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) } @@ -109,7 +129,6 @@ func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendCTZeroSample(ref, lset, t, ct) } @@ -340,6 +359,10 @@ func (a *headAppender) SetOptions(opts *storage.AppendOptions) { } func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + return a.AppendWithCT(ref, lset, t, 0, v) +} + +func (a *headAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, v float64) (storage.SeriesRef, error) { // Fail fast if OOO is disabled and the sample is out of bounds. // Otherwise a full check will be done later to decide if the sample is in-order or out-of-order. if a.oooTimeWindow == 0 && t < a.minValidTime { @@ -401,11 +424,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 if t > a.maxt { a.maxt = t } - + if ct != 0 && ct > t { + // TODO(bwplotka): Invalid (in future) CTs, ignore it. Add metric to report this event. + // WARN FOR PR experiment only + slog.Warn("got CT in future?", "ct", ct, "t", t, "lset", lset.String()) + ct = 0 + } a.samples = append(a.samples, record.RefSample{ Ref: s.ref, T: t, V: v, + CT: ct, }) a.sampleSeries = append(a.sampleSeries, s) return storage.SeriesRef(s.ref), nil @@ -413,7 +442,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 // AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns // error when sample can't be appended. See -// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation. +// storage.WithCTAppender.AppendCTZeroSample for further documentation. func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) { if ct >= t { return 0, storage.ErrCTNewerThanSample @@ -447,7 +476,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab if ct > a.maxt { a.maxt = ct } - a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) + a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0, CT: ct}) a.sampleSeries = append(a.sampleSeries, s) return storage.SeriesRef(s.ref), nil } @@ -647,6 +676,12 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, return storage.SeriesRef(s.ref), nil } +func (a *headAppender) AppendHistogramWithCT(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + // TODO(bwplotka): Add support for native histograms with CTs in WAL; add/consolidate records. + // We ignore CT for now. + return a.AppendHistogram(ref, lset, t, h, fh) +} + func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { if !a.head.opts.EnableNativeHistograms.Load() { return 0, storage.ErrNativeHistogramsDisabled diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 065e5ff008..eaf79c5d3d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -179,12 +179,12 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { for r.Next() { rec := r.Record() - switch dec.Type(rec) { + switch typ := dec.Type(rec); typ { case record.Series: series, err := dec.Series(rec, nil) require.NoError(t, err) recs = append(recs, series) - case record.Samples: + case record.Samples, record.SamplesWithCT: samples, err := dec.Samples(rec, nil) require.NoError(t, err) recs = append(recs, samples) @@ -209,7 +209,7 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { require.NoError(t, err) recs = append(recs, exemplars) default: - require.Fail(t, "unknown record type") + require.Fail(t, "unknown record type", typ) } } require.NoError(t, r.Err()) @@ -6295,15 +6295,14 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0)) } -func TestHeadAppender_AppendCT(t *testing.T) { +func TestHeadAppender_AppendCTZeroSample(t *testing.T) { testHistogram := tsdbutil.GenerateTestHistogram(1) testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) type appendableSamples struct { - ts int64 + ts, ct int64 fSample float64 h *histogram.Histogram fh *histogram.FloatHistogram - ct int64 } for _, tc := range []struct { name string @@ -6516,6 +6515,82 @@ func TestHeadAppender_AppendCT(t *testing.T) { } } +func TestHeadAppender_AppendWithCT(t *testing.T) { + type appendableSamples struct { + t, ct int64 + v float64 + } + for _, tc := range []struct { + name string + appendableSamples []appendableSamples + expectedSamples []record.RefSample + }{ + { + name: "sample with non-zero ct", + appendableSamples: []appendableSamples{ + {t: 100, v: 10, ct: 1}, + }, + expectedSamples: []record.RefSample{ + {T: 100, V: 10, CT: 1, Ref: 1}, + }, + }, + { + name: "sample with ct > t are ignored", + appendableSamples: []appendableSamples{ + {t: 100, v: 10, ct: 101}, + }, + expectedSamples: []record.RefSample{ + {T: 100, V: 10, Ref: 1}, + }, + }, + { + name: "multiple samples + same ct", + appendableSamples: []appendableSamples{ + {t: 100, v: 10, ct: 1}, + {t: 101, v: 10, ct: 1}, + }, + expectedSamples: []record.RefSample{ + {T: 100, V: 10, CT: 1, Ref: 1}, + {T: 101, V: 10, CT: 1, Ref: 1}, + }, + }, + { + name: "multiple samples + different ct", + appendableSamples: []appendableSamples{ + {t: 100, v: 10, ct: 1}, + {t: 102, v: 10, ct: 101}, + }, + expectedSamples: []record.RefSample{ + {T: 100, V: 10, CT: 1, Ref: 1}, + {T: 102, V: 10, CT: 101, Ref: 1}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, h.Close()) + }() + a := h.Appender(context.Background()) + + lbls := labels.FromStrings("foo", "bar") + for _, sample := range tc.appendableSamples { + _, err := a.AppendWithCT(0, lbls, sample.t, sample.ct, sample.v) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + recs := readTestWAL(t, w.Dir()) + _, ok := recs[0].([]record.RefSeries) + require.True(t, ok, "expected first record to be a RefSeries") + actualType := reflect.TypeOf(recs[1]) + require.Equal(t, reflect.TypeOf([]record.RefSample{}), actualType, "expected second record to be a record.RefSample") + + require.Equal(t, tc.expectedSamples, recs[1]) + }) + } +} + func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { // Use a chunk range of 1 here so that if we attempted to determine if the head // was compactable using default values for min and max times, `Head.compactable()` diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ad03fa4766..8aa4be94af 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -142,7 +142,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return } decoded <- series - case record.Samples: + case record.Samples, record.SamplesWithCT: samples := h.wlReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { @@ -686,7 +686,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch var err error rec := r.Record() switch dec.Type(rec) { - case record.Samples: + case record.Samples, record.SamplesWithCT: samples := h.wlReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 692976cdf8..c6152e5e02 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -56,6 +56,8 @@ const ( CustomBucketsHistogramSamples Type = 9 // CustomBucketsFloatHistogramSamples is used to match WAL records of type Float Histogram with custom buckets. CustomBucketsFloatHistogramSamples Type = 10 + // SamplesWithCT is an enhanced sample record that allows storing an optional CT per sample. + SamplesWithCT Type = 11 ) func (rt Type) String() string { @@ -64,6 +66,8 @@ func (rt Type) String() string { return "series" case Samples: return "samples" + case SamplesWithCT: + return "samples-with-ct" case Tombstones: return "tombstones" case Exemplars: @@ -155,12 +159,12 @@ type RefSeries struct { Labels labels.Labels } -// RefSample is a timestamp/value pair associated with a reference to a series. +// RefSample is a timestamp/ct/value struct associated with a reference to a series. // TODO(beorn7): Perhaps make this "polymorphic", including histogram and float-histogram pointers? Then get rid of RefHistogramSample. type RefSample struct { - Ref chunks.HeadSeriesRef - T int64 - V float64 + Ref chunks.HeadSeriesRef + T, CT int64 + V float64 } // RefMetadata is the metadata associated with a series ID. @@ -180,6 +184,7 @@ type RefExemplar struct { } // RefHistogramSample is a histogram. +// TODO(bwplotka): Add support for CT. type RefHistogramSample struct { Ref chunks.HeadSeriesRef T int64 @@ -187,6 +192,7 @@ type RefHistogramSample struct { } // RefFloatHistogramSample is a float histogram. +// TODO(bwplotka): Add support for CT. type RefFloatHistogramSample struct { Ref chunks.HeadSeriesRef T int64 @@ -215,7 +221,9 @@ func (d *Decoder) Type(rec []byte) Type { return Unknown } switch t := Type(rec[0]); t { - case Series, Samples, Tombstones, Exemplars, MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, CustomBucketsHistogramSamples, CustomBucketsFloatHistogramSamples: + case Series, Samples, SamplesWithCT, Tombstones, Exemplars, + MmapMarkers, Metadata, HistogramSamples, FloatHistogramSamples, + CustomBucketsHistogramSamples, CustomBucketsFloatHistogramSamples: return t } return Unknown @@ -304,10 +312,17 @@ func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels { // Samples appends samples in rec to the given slice. func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { dec := encoding.Decbuf{B: rec} - - if Type(dec.Byte()) != Samples { - return nil, errors.New("invalid record type") + switch typ := dec.Byte(); Type(typ) { + case Samples: + return d.samples(&dec, samples) + case SamplesWithCT: + return d.samplesWithCT(&dec, samples) + default: + return nil, fmt.Errorf("invalid record type %v, expected Samples(2) or SamplesWithCT(11)", typ) } +} + +func (d *Decoder) samples(dec *encoding.Decbuf, samples []RefSample) ([]RefSample, error) { if dec.Len() == 0 { return samples, nil } @@ -340,6 +355,42 @@ func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) return samples, nil } +func (d *Decoder) samplesWithCT(dec *encoding.Decbuf, samples []RefSample) ([]RefSample, error) { + if dec.Len() == 0 { + return samples, nil + } + var ( + baseRef = dec.Be64() + baseTime = dec.Be64int64() + baseCT = dec.Be64int64() + ) + // Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big. + if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize { + samples = make([]RefSample, 0, minSize) + } + for len(dec.B) > 0 && dec.Err() == nil { + dref := dec.Varint64() + dtime := dec.Varint64() + dCT := dec.Varint64() + val := dec.Be64() + + samples = append(samples, RefSample{ + Ref: chunks.HeadSeriesRef(int64(baseRef) + dref), + T: baseTime + dtime, + CT: baseCT + dCT, + V: math.Float64frombits(val), + }) + } + + if dec.Err() != nil { + return nil, fmt.Errorf("decode error after %d samples: %w", len(samples), dec.Err()) + } + if len(dec.B) > 0 { + return nil, fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return samples, nil +} + // Tombstones appends tombstones in rec to the given slice. func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombstones.Stone, error) { dec := encoding.Decbuf{B: rec} @@ -665,7 +716,17 @@ func EncodeLabels(buf *encoding.Encbuf, lbls labels.Labels) { } // Samples appends the encoded samples to b and returns the resulting slice. +// Depending on the CT existence it either writes Samples or SamplesWithCT record. func (e *Encoder) Samples(samples []RefSample, b []byte) []byte { + for _, s := range samples { + if s.CT != 0 { + return e.samplesWithCT(samples, b) + } + } + return e.samples(samples, b) +} + +func (e *Encoder) samples(samples []RefSample, b []byte) []byte { buf := encoding.Encbuf{B: b} buf.PutByte(byte(Samples)) @@ -688,6 +749,32 @@ func (e *Encoder) Samples(samples []RefSample, b []byte) []byte { return buf.Get() } +func (e *Encoder) samplesWithCT(samples []RefSample, b []byte) []byte { + buf := encoding.Encbuf{B: b} + buf.PutByte(byte(SamplesWithCT)) + + if len(samples) == 0 { + return buf.Get() + } + + // Store base timestamp, base CT and base reference number of first sample. + // All samples encode their timestamp, CT and ref as delta to those. + // TODO(ridwanmsharif): Should the timestamp be encoded as a delta with the CT? + first := samples[0] + + buf.PutBE64(uint64(first.Ref)) + buf.PutBE64int64(first.T) + buf.PutBE64int64(first.CT) + + for _, s := range samples { + buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) + buf.PutVarint64(s.T - first.T) + buf.PutVarint64(s.CT - first.CT) + buf.PutBE64(math.Float64bits(s.V)) + } + return buf.Get() +} + // Tombstones appends the encoded tombstones to b and returns the resulting slice. func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte { buf := encoding.Encbuf{B: b} diff --git a/tsdb/record/record_test.go b/tsdb/record/record_test.go index 9b2eb89c5a..797c33ac97 100644 --- a/tsdb/record/record_test.go +++ b/tsdb/record/record_test.go @@ -84,6 +84,15 @@ func TestRecord_EncodeDecode(t *testing.T) { require.NoError(t, err) require.Equal(t, samples, decSamples) + samplesWithCT := []RefSample{ + {Ref: 0, T: 12423423, CT: 14, V: 1.2345}, + {Ref: 123, T: -1231, CT: 14, V: -123}, + {Ref: 2, T: 0, CT: 14, V: 99999}, + } + decSamplesWithCT, err := dec.Samples(enc.Samples(samplesWithCT, nil), nil) + require.NoError(t, err) + require.Equal(t, samplesWithCT, decSamplesWithCT) + // Intervals get split up into single entries. So we don't get back exactly // what we put in. tstones := []tombstones.Stone{ diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 5c607d7030..4380d88bea 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -191,7 +191,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He stats.TotalSeries += len(series) stats.DroppedSeries += len(series) - len(repl) - case record.Samples: + case record.Samples, record.SamplesWithCT: samples, err = dec.Samples(rec, samples) if err != nil { return nil, fmt.Errorf("decode samples: %w", err) diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index a052de9258..b48ecaeec3 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -323,7 +323,7 @@ func TestCheckpoint(t *testing.T) { case record.Series: series, err = dec.Series(rec, series) require.NoError(t, err) - case record.Samples: + case record.Samples, record.SamplesWithCT: samples, err := dec.Samples(rec, nil) require.NoError(t, err) for _, s := range samples { diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index ca74a9ceaf..fbe5735734 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -503,7 +503,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } w.writer.StoreSeries(series, segmentNum) - case record.Samples: + case record.Samples, record.SamplesWithCT: // If we're not tailing a segment we can ignore any samples records we see. // This speeds up replay of the WAL by > 10x. if !tail {