From 7bfed4924e17967ed2d7197dce4eb8bffd75380f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 7 Dec 2023 08:24:33 +0000 Subject: [PATCH] Proposed changes to Append CT PR. Changes: * Changed textparse Parser interface for consistency and robustness. * Changed CT interface to be more explicit and handle validation. * Simplified test, change scrapeManager to allow testability. * Added TODOs. Signed-off-by: bwplotka --- cmd/prometheus/main.go | 6 +- config/config.go | 9 +- model/textparse/interface.go | 10 +- model/textparse/openmetricsparse.go | 9 +- model/textparse/promparse.go | 9 +- model/textparse/protobufparse.go | 22 +++-- model/textparse/protobufparse_test.go | 34 ++++--- scrape/helpers_test.go | 29 ++---- scrape/manager.go | 8 +- scrape/manager_test.go | 128 ++++++++++++++------------ scrape/scrape.go | 55 ++++++----- scrape/scrape_test.go | 3 + storage/fanout.go | 6 +- storage/interface.go | 29 ++++-- tsdb/agent/db.go | 5 +- tsdb/agent/db_test.go | 4 + tsdb/db_test.go | 68 ++++++++++++++ tsdb/head.go | 4 + tsdb/head_append.go | 37 ++++---- 19 files changed, 287 insertions(+), 188 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index fa5d65e8e0..4715fc9c3c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -209,12 +209,12 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) - case "created-timestamp-ingestion": - c.scrape.EnableCreatedTimestampIngestion = true + case "created-timestamp-zero-ingestion": + c.scrape.EnableCreatedTimestampZeroIngestion = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols - level.Info(logger).Log("msg", "Experimental created timestamp ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "": continue case "promql-at-modifier", "promql-negative-offset": diff --git a/config/config.go b/config/config.go index 4039015feb..ddcca84dc7 100644 --- a/config/config.go +++ b/config/config.go @@ -454,15 +454,18 @@ var ( OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0", } + // DefaultScrapeProtocols is the set of scrape protocols that will be proposed + // to scrape target, ordered by priority. DefaultScrapeProtocols = []ScrapeProtocol{ OpenMetricsText1_0_0, OpenMetricsText0_0_1, PrometheusText0_0_4, } - // DefaultProtoFirstScrapeProtocols is the set of scrape protocols that favors protobuf - // Prometheus exposition format. Used by default for certain feature-flags like - // "native-histograms" and "created-timestamp-ingestion". + // DefaultProtoFirstScrapeProtocols is like DefaultScrapeProtocols, but it + // favors protobuf Prometheus exposition format. + // Used by default for certain feature-flags like + // "native-histograms" and "created-timestamp-zero-ingestion". DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{ PrometheusProto, OpenMetricsText1_0_0, diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 2f5fdbc3bf..df4259c85c 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -16,8 +16,6 @@ package textparse import ( "mime" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -66,10 +64,10 @@ type Parser interface { // retrieved (including the case where no exemplars exist at all). Exemplar(l *exemplar.Exemplar) bool - // CreatedTimestamp writes the created timestamp of the current sample - // into the passed timestamp. It returns false if no created timestamp - // exists or if the metric type does not support created timestamps. - CreatedTimestamp(ct *types.Timestamp) bool + // CreatedTimestamp returns the created timestamp (in milliseconds) for the + // current sample. It returns nil if it is unknown e.g. if it wasn't set, + // if the scrape protocol or metric type does not support created timestamps. + CreatedTimestamp() *int64 // Next advances the parser to the next sample. It returns false if no // more samples were read or an error occurred. diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index bb50755441..f0c383723d 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -24,8 +24,6 @@ import ( "strings" "unicode/utf8" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -213,9 +211,10 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { return true } -// CreatedTimestamp returns false because OpenMetricsParser does not support created timestamps (yet). -func (p *OpenMetricsParser) CreatedTimestamp(_ *types.Timestamp) bool { - return false +// CreatedTimestamp returns nil as it's not implemented yet. +// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980 +func (p *OpenMetricsParser) CreatedTimestamp() *int64 { + return nil } // nextToken returns the next token from the openMetricsLexer. diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index b3fa2d8a6d..935801fb9e 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -26,8 +26,6 @@ import ( "unicode/utf8" "unsafe" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -247,9 +245,10 @@ func (p *PromParser) Exemplar(*exemplar.Exemplar) bool { return false } -// CreatedTimestamp returns false because PromParser does not support created timestamps. -func (p *PromParser) CreatedTimestamp(_ *types.Timestamp) bool { - return false +// CreatedTimestamp returns nil as it's not implemented yet. +// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980 +func (p *PromParser) CreatedTimestamp() *int64 { + return nil } // nextToken returns the next token from the promlexer. It skips over tabs diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 23afb5c596..baede7e1dd 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -360,22 +360,26 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { return true } -func (p *ProtobufParser) CreatedTimestamp(ct *types.Timestamp) bool { - var foundCT *types.Timestamp +// CreatedTimestamp returns CT or nil if CT is not present or +// invalid (as timestamp e.g. negative value) on counters, summaries or histograms. +func (p *ProtobufParser) CreatedTimestamp() *int64 { + var ct *types.Timestamp switch p.mf.GetType() { case dto.MetricType_COUNTER: - foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() case dto.MetricType_SUMMARY: - foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: - foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() default: } - if foundCT == nil { - return false + ctAsTime, err := types.TimestampFromProto(ct) + if err != nil { + // Errors means ct == nil or invalid timestamp, which we silently ignore. + return nil } - *ct = *foundCT - return true + ctMilis := ctAsTime.UnixMilli() + return &ctMilis } // Next advances the parser to the next "sample" (emulating the behavior of a diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index d83f2088a1..b9530fe0fe 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/gogo/protobuf/proto" - "github.com/gogo/protobuf/types" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" @@ -630,7 +629,7 @@ func TestProtobufParse(t *testing.T) { shs *histogram.Histogram fhs *histogram.FloatHistogram e []exemplar.Exemplar - ct *types.Timestamp + ct int64 } inputBuf := createTestProtoBuf(t) @@ -1069,7 +1068,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_counter_with_createdtimestamp", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_counter_with_createdtimestamp", ), @@ -1085,7 +1084,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_summary_with_createdtimestamp_count", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_count", ), @@ -1093,7 +1092,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_summary_with_createdtimestamp_sum", v: 1.234, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_sum", ), @@ -1108,7 +1107,7 @@ func TestProtobufParse(t *testing.T) { }, { m: "test_histogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.UnknownCounterReset, PositiveSpans: []histogram.Span{}, @@ -1128,7 +1127,7 @@ func TestProtobufParse(t *testing.T) { }, { m: "test_gaugehistogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.GaugeType, PositiveSpans: []histogram.Span{}, @@ -1887,7 +1886,7 @@ func TestProtobufParse(t *testing.T) { { // 83 m: "test_counter_with_createdtimestamp", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_counter_with_createdtimestamp", ), @@ -1903,7 +1902,7 @@ func TestProtobufParse(t *testing.T) { { // 86 m: "test_summary_with_createdtimestamp_count", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_count", ), @@ -1911,7 +1910,7 @@ func TestProtobufParse(t *testing.T) { { // 87 m: "test_summary_with_createdtimestamp_sum", v: 1.234, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_sum", ), @@ -1926,7 +1925,7 @@ func TestProtobufParse(t *testing.T) { }, { // 90 m: "test_histogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.UnknownCounterReset, PositiveSpans: []histogram.Span{}, @@ -1946,7 +1945,7 @@ func TestProtobufParse(t *testing.T) { }, { // 93 m: "test_gaugehistogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.GaugeType, PositiveSpans: []histogram.Span{}, @@ -1981,10 +1980,9 @@ func TestProtobufParse(t *testing.T) { m, ts, v := p.Series() var e exemplar.Exemplar - var ct types.Timestamp p.Metric(&res) eFound := p.Exemplar(&e) - ctFound := p.CreatedTimestamp(&ct) + ct := p.CreatedTimestamp() require.Equal(t, exp[i].m, string(m), "i: %d", i) if ts != nil { require.Equal(t, exp[i].t, *ts, "i: %d", i) @@ -2000,11 +1998,11 @@ func TestProtobufParse(t *testing.T) { require.Equal(t, exp[i].e[0], e, "i: %d", i) require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i) } - if exp[i].ct != nil { - require.Equal(t, true, ctFound, "i: %d", i) - require.Equal(t, exp[i].ct.String(), ct.String(), "i: %d", i) + if exp[i].ct != 0 { + require.NotNilf(t, ct, "i: %d", i) + require.Equal(t, exp[i].ct, *ct, "i: %d", i) } else { - require.Equal(t, false, ctFound, "i: %d", i) + require.Nilf(t, ct, "i: %d", i) } case EntryHistogram: diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 4a7ca1a05e..b10ca7d45a 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -58,7 +58,7 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M return 0, nil } -func (a nopAppender) AppendCreatedTimestamp(storage.SeriesRef, labels.Labels, int64) (storage.SeriesRef, error) { +func (a nopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { return 0, nil } @@ -162,19 +162,8 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L return a.next.UpdateMetadata(ref, l, m) } -func (a *collectResultAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) { - a.mtx.Lock() - defer a.mtx.Unlock() - a.pendingFloats = append(a.pendingFloats, floatSample{ - metric: l, - t: t, - f: 0.0, - }) - - if ref == 0 { - ref = storage.SeriesRef(rand.Uint64()) - } - return ref, nil +func (a *collectResultAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t int64, ct int64) (storage.SeriesRef, error) { + return a.Append(ref, l, ct, 0.0) } func (a *collectResultAppender) Commit() error { @@ -221,18 +210,20 @@ func (a *collectResultAppender) String() string { return sb.String() } -// serializeMetricFamily serializes a MetricFamily into a byte slice. -// Needed because Prometheus has its own implementation of protobuf -// marshalling and unmarshalling that only supports 'encoding=delimited'. +// protoMarshalDelimited marshals a MetricFamily into a delimited +// Prometheus proto exposition format bytes (known as 'encoding=delimited`) +// // See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers -func serializeMetricFamily(t *testing.T, mf *dto.MetricFamily) []byte { +func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte { t.Helper() - buf := &bytes.Buffer{} + protoBuf, err := proto.Marshal(mf) require.NoError(t, err) varintBuf := make([]byte, binary.MaxVarintLen32) varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + + buf := &bytes.Buffer{} buf.Write(varintBuf[:varintLength]) buf.Write(protoBuf) return buf.Bytes() diff --git a/scrape/manager.go b/scrape/manager.go index f3e585251f..faa46f54d6 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -78,11 +78,15 @@ type Options struct { EnableMetadataStorage bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration - // Option to enable the ingestion of the created timestamp of a metric. - EnableCreatedTimestampIngestion bool + // Option to enable the ingestion of the created timestamp as a synthetic zero sample. + // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md + EnableCreatedTimestampZeroIngestion bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption + + // private option for testability. + skipOffsetting bool } // Manager maintains a set of scrape pools and manages start/stop cycles diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 7837891bdb..93970a4573 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -15,12 +15,13 @@ package scrape import ( "context" - "errors" + "fmt" "net/http" "net/http/httptest" "net/url" "os" "strconv" + "sync" "testing" "time" @@ -724,59 +725,53 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } -func TestManagerScrapeCreatedTimestamp(t *testing.T) { - counterType := dto.MetricType_COUNTER - now := time.Now() - nowMs := now.UnixMilli() - - makeMfWithCT := func(ct time.Time) *dto.MetricFamily { - return &dto.MetricFamily{ - Name: proto.String("expected_counter"), - Type: &counterType, - Metric: []*dto.Metric{ - { - Counter: &dto.Counter{ - Value: proto.Float64(1.0), - CreatedTimestamp: timestamppb.New(ct), - }, - }, - }, - } - } +// TestManagerCTZeroIngestion tests scrape manager for CT cases. +func TestManagerCTZeroIngestion(t *testing.T) { + const mName = "expected_counter" for _, tc := range []struct { name string - mf *dto.MetricFamily - ingestCT bool - expectedScrapedValues []float64 + counterSample *dto.Counter + enableCTZeroIngestion bool + + expectedValues []float64 }{ { - name: "valid counter/Ingestion enabled", - mf: makeMfWithCT(now), - ingestCT: true, - expectedScrapedValues: []float64{0.0, 1.0}, + name: "disabled with CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + // Timestamp does not matter as long as it exists in this test. + CreatedTimestamp: timestamppb.Now(), + }, + expectedValues: []float64{1.0}, }, { - name: "valid counter/Ingestion disabled", - mf: makeMfWithCT(now), - ingestCT: false, - expectedScrapedValues: []float64{1.0}, + name: "enabled with CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + // Timestamp does not matter as long as it exists in this test. + CreatedTimestamp: timestamppb.Now(), + }, + enableCTZeroIngestion: true, + expectedValues: []float64{0.0, 1.0}, }, { - name: "created timestamp older than sample timestamp", - mf: func() *dto.MetricFamily { - mf := makeMfWithCT(now.Add(time.Hour)) - mf.Metric[0].TimestampMs = &nowMs - return mf - }(), - ingestCT: true, - expectedScrapedValues: []float64{1.0}, + name: "enabled without CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + }, + enableCTZeroIngestion: true, + expectedValues: []float64{1.0}, }, } { t.Run(tc.name, func(t *testing.T) { + app := &collectResultAppender{} scrapeManager, err := NewManager( - &Options{EnableCreatedTimestampIngestion: tc.ingestCT}, + &Options{ + EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, + skipOffsetting: true, + }, log.NewLogfmtLogger(os.Stderr), &collectResultAppendable{app}, prometheus.NewRegistry(), @@ -785,18 +780,35 @@ func TestManagerScrapeCreatedTimestamp(t *testing.T) { require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ GlobalConfig: config.GlobalConfig{ - ScrapeInterval: model.Duration(5 * time.Second), - ScrapeTimeout: model.Duration(5 * time.Second), + // Disable regular scrapes. + ScrapeInterval: model.Duration(9999 * time.Minute), + ScrapeTimeout: model.Duration(5 * time.Second), + // Ensure proto is chosen. ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, }, ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, })) - // Start fake HTTP target to scrape returning a single metric. + once := sync.Once{} + // Start fake HTTP target to that allow one scrape only. server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) - w.Write(serializeMetricFamily(t, tc.mf)) + fail := true + once.Do(func() { + fail = false + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + + var ctrType = dto.MetricType_COUNTER + w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{Counter: tc.counterSample}}, + })) + }) + + if fail { + w.WriteHeader(http.StatusInternalServerError) + } }), ) defer server.Close() @@ -807,27 +819,27 @@ func TestManagerScrapeCreatedTimestamp(t *testing.T) { // Add fake target directly into tsets + reload. Normally users would use // Manager.Run and wait for minimum 5s refresh interval. scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": { - { - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }, - }, + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, }) scrapeManager.reload() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { - if countFloatSamples(app, *tc.mf.Name) < 1 { - return errors.New("expected at least one sample") + if countFloatSamples(app, mName) != len(tc.expectedValues) { + return fmt.Errorf("expected %v samples", tc.expectedValues) } return nil - }), "after 5 seconds") + }), "after 1 minute") scrapeManager.Stop() - require.Equal(t, tc.expectedScrapedValues, getResultFloats(app, *tc.mf.Name)) + require.Equal(t, tc.expectedValues, getResultFloats(app, mName)) }) } } diff --git a/scrape/scrape.go b/scrape/scrape.go index c5aaad6894..97684bd01c 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -31,7 +31,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gogo/protobuf/types" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -107,9 +106,10 @@ type scrapeLoopOptions struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool - mrc []*relabel.Config - cache *scrapeCache - enableCompression bool + + mrc []*relabel.Config + cache *scrapeCache + enableCompression bool } const maxAheadTime = 10 * time.Minute @@ -169,12 +169,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.interval, opts.timeout, opts.scrapeClassicHistograms, - options.EnableCreatedTimestampIngestion, + options.EnableCreatedTimestampZeroIngestion, options.ExtraMetrics, options.EnableMetadataStorage, opts.target, options.PassMetadataInContext, metrics, + options.skipOffsetting, ) } sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) @@ -789,7 +790,7 @@ type scrapeLoop struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool - scrapeCreatedTimestamps bool + enableCTZeroIngestion bool appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator @@ -807,6 +808,8 @@ type scrapeLoop struct { appendMetadataToWAL bool metrics *scrapeMetrics + + skipOffsetting bool // For testability. } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -1079,12 +1082,13 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, scrapeClassicHistograms bool, - scrapeCreatedTimestamps bool, + enableCTZeroIngestion bool, reportExtraMetrics bool, appendMetadataToWAL bool, target *Target, passMetadataInContext bool, metrics *scrapeMetrics, + skipOffsetting bool, ) *scrapeLoop { if l == nil { l = log.NewNopLogger() @@ -1128,10 +1132,11 @@ func newScrapeLoop(ctx context.Context, interval: interval, timeout: timeout, scrapeClassicHistograms: scrapeClassicHistograms, - scrapeCreatedTimestamps: scrapeCreatedTimestamps, + enableCTZeroIngestion: enableCTZeroIngestion, reportExtraMetrics: reportExtraMetrics, appendMetadataToWAL: appendMetadataToWAL, metrics: metrics, + skipOffsetting: skipOffsetting, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1139,12 +1144,14 @@ func newScrapeLoop(ctx context.Context, } func (sl *scrapeLoop) run(errc chan<- error) { - select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): - // Continue after a scraping offset. - case <-sl.ctx.Done(): - close(sl.stopped) - return + if !sl.skipOffsetting { + select { + case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + close(sl.stopped) + return + } } var last time.Time @@ -1562,20 +1569,12 @@ loop: updateMetadata(lset, true) } - if sl.scrapeCreatedTimestamps { - var ct types.Timestamp - if p.CreatedTimestamp(&ct) { - if ctMs := (ct.Seconds * 1000) + int64(ct.Nanos/1_000_000); ctMs < t { - ref, err = app.AppendCreatedTimestamp(ref, lset, ctMs) - if err != nil { - if errors.Is(err, storage.ErrCreatedTimestampOutOfOrder) { - level.Debug(sl.l).Log("msg", storage.ErrCreatedTimestampOutOfOrder) - } else { - level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) - break loop - } - } - } + if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. + // CT is an experimental feature. For now, we don't need to fail the + // scrape on errors updating the created timestamp, log debug. + level.Debug(sl.l).Log("msg", "Error when updating metadata in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) } } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 167696590f..9a55249661 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -664,6 +664,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app nil, false, newTestScrapeMetrics(t), + false, ) } @@ -806,6 +807,7 @@ func TestScrapeLoopRun(t *testing.T) { nil, false, scrapeMetrics, + false, ) // The loop must terminate during the initial offset if the context @@ -951,6 +953,7 @@ func TestScrapeLoopMetadata(t *testing.T) { nil, false, scrapeMetrics, + false, ) defer cancel() diff --git a/storage/fanout.go b/storage/fanout.go index 862cf2906d..dad58181cb 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -202,14 +202,14 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada return ref, nil } -func (f *fanoutAppender) AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) { - ref, err := f.primary.AppendCreatedTimestamp(ref, l, t) +func (f *fanoutAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error) { + ref, err := f.primary.AppendCTZeroSample(ref, l, t, ct) if err != nil { return ref, err } for _, appender := range f.secondaries { - if _, err := appender.AppendCreatedTimestamp(ref, l, t); err != nil { + if _, err := appender.AppendCTZeroSample(ref, l, t, ct); err != nil { return 0, err } } diff --git a/storage/interface.go b/storage/interface.go index dfa8c7856e..bc2c765edb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -43,7 +43,13 @@ var ( ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled") - ErrCreatedTimestampOutOfOrder = fmt.Errorf("created timestamp out of order, ignoring") + + // ErrOutOfOrderCT indicates failed append of CT to the storage + // due to CT being older the then newer sample. + // NOTE(bwplotka): This can be both an instrumentation failure or commonly expected + // behaviour, and we currently don't have a way to determine this. As a result + // it's recommended to ignore this error for now. + ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring") ) // SeriesRef is a generic series reference. In prometheus it is either a @@ -296,17 +302,22 @@ type MetadataUpdater interface { UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) } -// CreatedTimestampAppender provides an interface for appending created timestamps to the storage. +// CreatedTimestampAppender provides an interface for appending CT to storage. type CreatedTimestampAppender interface { - // AppendCreatedTimestamp adds an extra sample to the given series labels. - // The value of the appended sample is always zero, while the sample's timestamp - // is the one exposed by the target as created timestamp. + // AppendCTZeroSample adds synthetic zero sample for the given ct timestamp, + // which will be associated with given series, labels and the incoming + // sample's t (timestamp). AppendCTZeroSample returns error if zero sample can't be + // appended, for example when ct is too old, or when it would collide with + // incoming sample (sample has priority). // - // Appending created timestamps is optional, that is because appending sythetic zeros - // should only happen if created timestamp respects the order of the samples, i.e. is not out-of-order. + // AppendCTZeroSample has to be called before the corresponding sample Append. + // A series reference number is returned which can be used to modify the + // CT for the given series in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to AppendCTZeroSample() at any point. // - // When AppendCreatedTimestamp decides to not append a sample, it should return an error that can be treated by the caller. - AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) + // If the reference is 0 it must not be used for caching. + AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error) } // SeriesSet contains a set of series. diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index b1aac489d8..93eb28bcb1 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -954,8 +954,9 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met return 0, nil } -// AppendCreatedTimestamp wasn't implemented for agent mode, yet. -func (a *appender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) { +func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t int64, ct int64) (storage.SeriesRef, error) { + // TODO(bwplotka): Implement + panic("to implement") return 0, nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 1e0976c3f1..df4f9b3035 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -878,3 +878,7 @@ func TestDBAllowOOOSamples(t *testing.T) { require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") require.NoError(t, db.Close()) } + +func TestAgentAppender_AppendCTZeroSample(t *testing.T) { + t.Fatalf("TODO") +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f602f5ee9d..0a604188ad 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6954,3 +6954,71 @@ Outer: require.NoError(t, writerErr) } + +func TestHeadAppender_AppendCTZeroSample(t *testing.T) { + t.Fatalf("TODO") + + // NOTE(bwplotka): We could reuse metadata test part as copied below: + updateMetadata := func(t *testing.T, app storage.Appender, s labels.Labels, m metadata.Metadata) { + _, err := app.UpdateMetadata(0, s, m) + require.NoError(t, err) + } + + db := newTestDB(t) + ctx := context.Background() + + // Add some series so we can append metadata to them. + app := db.Appender(ctx) + s1 := labels.FromStrings("a", "b") + s2 := labels.FromStrings("c", "d") + s3 := labels.FromStrings("e", "f") + s4 := labels.FromStrings("g", "h") + + for _, s := range []labels.Labels{s1, s2, s3, s4} { + _, err := app.Append(0, s, 0, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Add a first round of metadata to the first three series. + // Re-take the Appender, as the previous Commit will have it closed. + m1 := metadata.Metadata{Type: "gauge", Unit: "unit_1", Help: "help_1"} + m2 := metadata.Metadata{Type: "gauge", Unit: "unit_2", Help: "help_2"} + m3 := metadata.Metadata{Type: "gauge", Unit: "unit_3", Help: "help_3"} + app = db.Appender(ctx) + updateMetadata(t, app, s1, m1) + updateMetadata(t, app, s2, m2) + updateMetadata(t, app, s3, m3) + require.NoError(t, app.Commit()) + + // Add a replicated metadata entry to the first series, + // a completely new metadata entry for the fourth series, + // and a changed metadata entry to the second series. + m4 := metadata.Metadata{Type: "counter", Unit: "unit_4", Help: "help_4"} + m5 := metadata.Metadata{Type: "counter", Unit: "unit_5", Help: "help_5"} + app = db.Appender(ctx) + updateMetadata(t, app, s1, m1) + updateMetadata(t, app, s4, m4) + updateMetadata(t, app, s2, m5) + require.NoError(t, app.Commit()) + + // Read the WAL to see if the disk storage format is correct. + recs := readTestWAL(t, path.Join(db.Dir(), "wal")) + var gotMetadataBlocks [][]record.RefMetadata + for _, rec := range recs { + if mr, ok := rec.([]record.RefMetadata); ok { + gotMetadataBlocks = append(gotMetadataBlocks, mr) + } + } + + expectedMetadata := []record.RefMetadata{ + {Ref: 1, Type: record.GetMetricType(m1.Type), Unit: m1.Unit, Help: m1.Help}, + {Ref: 2, Type: record.GetMetricType(m2.Type), Unit: m2.Unit, Help: m2.Help}, + {Ref: 3, Type: record.GetMetricType(m3.Type), Unit: m3.Unit, Help: m3.Help}, + {Ref: 4, Type: record.GetMetricType(m4.Type), Unit: m4.Unit, Help: m4.Help}, + {Ref: 2, Type: record.GetMetricType(m5.Type), Unit: m5.Unit, Help: m5.Help}, + } + require.Len(t, gotMetadataBlocks, 2) + require.Equal(t, expectedMetadata[:3], gotMetadataBlocks[0]) + require.Equal(t, expectedMetadata[3:], gotMetadataBlocks[1]) +} diff --git a/tsdb/head.go b/tsdb/head.go index 3ff2bee716..8483573590 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -149,6 +149,10 @@ type HeadOptions struct { // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms atomic.Bool + // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. + // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md + EnableCreatedTimestampZeroIngestion bool + ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string diff --git a/tsdb/head_append.go b/tsdb/head_append.go index b1507c611e..983fe7d52d 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -87,15 +87,15 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m return a.app.UpdateMetadata(ref, l, m) } -func (a *initAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) { +func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t int64, ct int64) (storage.SeriesRef, error) { if a.app != nil { - return a.app.AppendCreatedTimestamp(ref, lset, t) + return a.app.AppendCTZeroSample(ref, lset, t, ct) } a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendCreatedTimestamp(ref, lset, t) + return a.app.AppendCTZeroSample(ref, lset, t, ct) } // initTime initializes a head with the first timestamp. This only needs to be called @@ -383,10 +383,14 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 return storage.SeriesRef(s.ref), nil } -// AppendCreatedTimestamp appends a sample with 0 as its value when it makes sense to do so. -// For instance, it's not safe or efficient to append out-of-order created -// timestamp (e.g. we don't know if we didn't append zero for this created timestamp already). -func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) { +// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns +// error when sample can't be appended. See +// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation. +func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t int64, ct int64) (storage.SeriesRef, error) { + if ct >= t { + return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring") + } + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error @@ -396,8 +400,11 @@ func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels } } + // Check if CT wouldn't be OOO vs samples we already might have for this series. + // NOTE(bwplotka): This will be often hit as it's expected for long living + // counters to share the same CT. s.Lock() - isOOO, _, err := s.appendable(t, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow) + isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err == nil { s.pendingCommit = true } @@ -405,20 +412,14 @@ func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels if err != nil { return 0, err } - if isOOO { - return storage.SeriesRef(s.ref), storage.ErrCreatedTimestampOutOfOrder + return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT } - if t > a.maxt { - a.maxt = t + if ct > a.maxt { + a.maxt = ct } - - a.samples = append(a.samples, record.RefSample{ - Ref: s.ref, - T: t, - V: 0.0, - }) + a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) a.sampleSeries = append(a.sampleSeries, s) return storage.SeriesRef(s.ref), nil }