From 5082655392fe2eb036f24da51b54ca050496368a Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 11 Dec 2023 05:43:42 -0300 Subject: [PATCH] Append Created Timestamps (#12733) * Append created timestamps. Signed-off-by: Arthur Silva Sens * Log when created timestamps are ignored Signed-off-by: Arthur Silva Sens * Proposed changes to Append CT PR. Changes: * Changed textparse Parser interface for consistency and robustness. * Changed CT interface to be more explicit and handle validation. * Simplified test, change scrapeManager to allow testability. * Added TODOs. Signed-off-by: bwplotka * Updates. Signed-off-by: bwplotka * Addressed comments. Signed-off-by: bwplotka * Refactor head_appender test Signed-off-by: Arthur Silva Sens * Fix linter issues Signed-off-by: Arthur Silva Sens * Use model.Sample in head appender test Signed-off-by: Arthur Silva Sens --------- Signed-off-by: Arthur Silva Sens Signed-off-by: bwplotka Co-authored-by: bwplotka --- cmd/prometheus/main.go | 14 ++- config/config.go | 9 +- model/textparse/interface.go | 10 +- model/textparse/openmetricsparse.go | 9 +- model/textparse/promparse.go | 9 +- model/textparse/protobufparse.go | 22 ++-- model/textparse/protobufparse_test.go | 34 +++--- scrape/helpers_test.go | 57 ++++++++++ scrape/manager.go | 6 + scrape/manager_test.go | 153 ++++++++++++++++++++++++++ scrape/scrape.go | 39 +++++-- scrape/scrape_test.go | 8 +- storage/fanout.go | 14 +++ storage/interface.go | 26 +++++ storage/remote/write.go | 5 + storage/remote/write_handler_test.go | 5 + tsdb/agent/db.go | 5 + tsdb/head.go | 4 + tsdb/head_append.go | 95 +++++++++++++--- tsdb/head_test.go | 91 +++++++++++++++ util/runutil/runutil.go | 37 +++++++ 21 files changed, 578 insertions(+), 74 deletions(-) create mode 100644 util/runutil/runutil.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index dfafe66c6..106f9d05c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -206,9 +206,15 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "native-histograms": c.tsdb.EnableNativeHistograms = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. - config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols - config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols + config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + case "created-timestamp-zero-ingestion": + c.scrape.EnableCreatedTimestampZeroIngestion = true + // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. + config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "": continue case "promql-at-modifier", "promql-negative-offset": @@ -1449,6 +1455,10 @@ func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } diff --git a/config/config.go b/config/config.go index b832ac9a1..ddcca84dc 100644 --- a/config/config.go +++ b/config/config.go @@ -454,12 +454,19 @@ var ( OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0", } + // DefaultScrapeProtocols is the set of scrape protocols that will be proposed + // to scrape target, ordered by priority. DefaultScrapeProtocols = []ScrapeProtocol{ OpenMetricsText1_0_0, OpenMetricsText0_0_1, PrometheusText0_0_4, } - DefaultNativeHistogramScrapeProtocols = []ScrapeProtocol{ + + // DefaultProtoFirstScrapeProtocols is like DefaultScrapeProtocols, but it + // favors protobuf Prometheus exposition format. + // Used by default for certain feature-flags like + // "native-histograms" and "created-timestamp-zero-ingestion". + DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{ PrometheusProto, OpenMetricsText1_0_0, OpenMetricsText0_0_1, diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 2f5fdbc3b..df4259c85 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -16,8 +16,6 @@ package textparse import ( "mime" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -66,10 +64,10 @@ type Parser interface { // retrieved (including the case where no exemplars exist at all). Exemplar(l *exemplar.Exemplar) bool - // CreatedTimestamp writes the created timestamp of the current sample - // into the passed timestamp. It returns false if no created timestamp - // exists or if the metric type does not support created timestamps. - CreatedTimestamp(ct *types.Timestamp) bool + // CreatedTimestamp returns the created timestamp (in milliseconds) for the + // current sample. It returns nil if it is unknown e.g. if it wasn't set, + // if the scrape protocol or metric type does not support created timestamps. + CreatedTimestamp() *int64 // Next advances the parser to the next sample. It returns false if no // more samples were read or an error occurred. diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index bb5075544..f0c383723 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -24,8 +24,6 @@ import ( "strings" "unicode/utf8" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -213,9 +211,10 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { return true } -// CreatedTimestamp returns false because OpenMetricsParser does not support created timestamps (yet). -func (p *OpenMetricsParser) CreatedTimestamp(_ *types.Timestamp) bool { - return false +// CreatedTimestamp returns nil as it's not implemented yet. +// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980 +func (p *OpenMetricsParser) CreatedTimestamp() *int64 { + return nil } // nextToken returns the next token from the openMetricsLexer. diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index b3fa2d8a6..935801fb9 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -26,8 +26,6 @@ import ( "unicode/utf8" "unsafe" - "github.com/gogo/protobuf/types" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -247,9 +245,10 @@ func (p *PromParser) Exemplar(*exemplar.Exemplar) bool { return false } -// CreatedTimestamp returns false because PromParser does not support created timestamps. -func (p *PromParser) CreatedTimestamp(_ *types.Timestamp) bool { - return false +// CreatedTimestamp returns nil as it's not implemented yet. +// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980 +func (p *PromParser) CreatedTimestamp() *int64 { + return nil } // nextToken returns the next token from the promlexer. It skips over tabs diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 23afb5c59..baede7e1d 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -360,22 +360,26 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { return true } -func (p *ProtobufParser) CreatedTimestamp(ct *types.Timestamp) bool { - var foundCT *types.Timestamp +// CreatedTimestamp returns CT or nil if CT is not present or +// invalid (as timestamp e.g. negative value) on counters, summaries or histograms. +func (p *ProtobufParser) CreatedTimestamp() *int64 { + var ct *types.Timestamp switch p.mf.GetType() { case dto.MetricType_COUNTER: - foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() case dto.MetricType_SUMMARY: - foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: - foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() + ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() default: } - if foundCT == nil { - return false + ctAsTime, err := types.TimestampFromProto(ct) + if err != nil { + // Errors means ct == nil or invalid timestamp, which we silently ignore. + return nil } - *ct = *foundCT - return true + ctMilis := ctAsTime.UnixMilli() + return &ctMilis } // Next advances the parser to the next "sample" (emulating the behavior of a diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index e062e64dd..c5b672dbc 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/gogo/protobuf/proto" - "github.com/gogo/protobuf/types" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" @@ -630,7 +629,7 @@ func TestProtobufParse(t *testing.T) { shs *histogram.Histogram fhs *histogram.FloatHistogram e []exemplar.Exemplar - ct *types.Timestamp + ct int64 } inputBuf := createTestProtoBuf(t) @@ -1069,7 +1068,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_counter_with_createdtimestamp", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_counter_with_createdtimestamp", ), @@ -1085,7 +1084,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_summary_with_createdtimestamp_count", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_count", ), @@ -1093,7 +1092,7 @@ func TestProtobufParse(t *testing.T) { { m: "test_summary_with_createdtimestamp_sum", v: 1.234, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_sum", ), @@ -1108,7 +1107,7 @@ func TestProtobufParse(t *testing.T) { }, { m: "test_histogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.UnknownCounterReset, PositiveSpans: []histogram.Span{}, @@ -1128,7 +1127,7 @@ func TestProtobufParse(t *testing.T) { }, { m: "test_gaugehistogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.GaugeType, PositiveSpans: []histogram.Span{}, @@ -1887,7 +1886,7 @@ func TestProtobufParse(t *testing.T) { { // 83 m: "test_counter_with_createdtimestamp", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_counter_with_createdtimestamp", ), @@ -1903,7 +1902,7 @@ func TestProtobufParse(t *testing.T) { { // 86 m: "test_summary_with_createdtimestamp_count", v: 42, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_count", ), @@ -1911,7 +1910,7 @@ func TestProtobufParse(t *testing.T) { { // 87 m: "test_summary_with_createdtimestamp_sum", v: 1.234, - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, lset: labels.FromStrings( "__name__", "test_summary_with_createdtimestamp_sum", ), @@ -1926,7 +1925,7 @@ func TestProtobufParse(t *testing.T) { }, { // 90 m: "test_histogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.UnknownCounterReset, PositiveSpans: []histogram.Span{}, @@ -1946,7 +1945,7 @@ func TestProtobufParse(t *testing.T) { }, { // 93 m: "test_gaugehistogram_with_createdtimestamp", - ct: &types.Timestamp{Seconds: 1, Nanos: 1}, + ct: 1000, shs: &histogram.Histogram{ CounterResetHint: histogram.GaugeType, PositiveSpans: []histogram.Span{}, @@ -1981,10 +1980,9 @@ func TestProtobufParse(t *testing.T) { m, ts, v := p.Series() var e exemplar.Exemplar - var ct types.Timestamp p.Metric(&res) eFound := p.Exemplar(&e) - ctFound := p.CreatedTimestamp(&ct) + ct := p.CreatedTimestamp() require.Equal(t, exp[i].m, string(m), "i: %d", i) if ts != nil { require.Equal(t, exp[i].t, *ts, "i: %d", i) @@ -2000,11 +1998,11 @@ func TestProtobufParse(t *testing.T) { require.Equal(t, exp[i].e[0], e, "i: %d", i) require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i) } - if exp[i].ct != nil { - require.True(t, ctFound, "i: %d", i) - require.Equal(t, exp[i].ct.String(), ct.String(), "i: %d", i) + if exp[i].ct != 0 { + require.NotNilf(t, ct, "i: %d", i) + require.Equal(t, exp[i].ct, *ct, "i: %d", i) } else { - require.False(t, ctFound, "i: %d", i) + require.Nilf(t, ct, "i: %d", i) } case EntryHistogram: diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index c580a5051..43ee0fcec 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -14,10 +14,18 @@ package scrape import ( + "bytes" "context" + "encoding/binary" "fmt" "math/rand" "strings" + "sync" + "testing" + + "github.com/gogo/protobuf/proto" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -50,6 +58,10 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M return 0, nil } +func (a nopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { + return 0, nil +} + func (a nopAppender) Commit() error { return nil } func (a nopAppender) Rollback() error { return nil } @@ -65,9 +77,19 @@ type histogramSample struct { fh *histogram.FloatHistogram } +type collectResultAppendable struct { + *collectResultAppender +} + +func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender { + return a +} + // collectResultAppender records all samples that were added through the appender. // It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { + mtx sync.Mutex + next storage.Appender resultFloats []floatSample pendingFloats []floatSample @@ -82,6 +104,8 @@ type collectResultAppender struct { } func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingFloats = append(a.pendingFloats, floatSample{ metric: lset, t: t, @@ -103,6 +127,8 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels } func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingExemplars = append(a.pendingExemplars, e) if a.next == nil { return 0, nil @@ -112,6 +138,8 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L } func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) if a.next == nil { return 0, nil @@ -121,6 +149,8 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels. } func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingMetadata = append(a.pendingMetadata, m) if ref == 0 { ref = storage.SeriesRef(rand.Uint64()) @@ -132,7 +162,13 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L return a.next.UpdateMetadata(ref, l, m) } +func (a *collectResultAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + return a.Append(ref, l, ct, 0.0) +} + func (a *collectResultAppender) Commit() error { + a.mtx.Lock() + defer a.mtx.Unlock() a.resultFloats = append(a.resultFloats, a.pendingFloats...) a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) @@ -148,6 +184,8 @@ func (a *collectResultAppender) Commit() error { } func (a *collectResultAppender) Rollback() error { + a.mtx.Lock() + defer a.mtx.Unlock() a.rolledbackFloats = a.pendingFloats a.rolledbackHistograms = a.pendingHistograms a.pendingFloats = nil @@ -171,3 +209,22 @@ func (a *collectResultAppender) String() string { } return sb.String() } + +// protoMarshalDelimited marshals a MetricFamily into a delimited +// Prometheus proto exposition format bytes (known as 'encoding=delimited`) +// +// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers +func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte { + t.Helper() + + protoBuf, err := proto.Marshal(mf) + require.NoError(t, err) + + varintBuf := make([]byte, binary.MaxVarintLen32) + varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + + buf := &bytes.Buffer{} + buf.Write(varintBuf[:varintLength]) + buf.Write(protoBuf) + return buf.Bytes() +} diff --git a/scrape/manager.go b/scrape/manager.go index 3b70e48a1..faa46f54d 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -78,9 +78,15 @@ type Options struct { EnableMetadataStorage bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable the ingestion of the created timestamp as a synthetic zero sample. + // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md + EnableCreatedTimestampZeroIngestion bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption + + // private option for testability. + skipOffsetting bool } // Manager maintains a set of scrape pools and manages start/stop cycles diff --git a/scrape/manager_test.go b/scrape/manager_test.go index a689c469d..524424269 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -15,14 +15,23 @@ package scrape import ( "context" + "fmt" "net/http" + "net/http/httptest" + "net/url" + "os" "strconv" + "sync" "testing" "time" + "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/config" @@ -30,6 +39,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/util/runutil" ) func TestPopulateLabels(t *testing.T) { @@ -714,3 +724,146 @@ scrape_configs: reload(scrapeManager, cfg2) require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } + +// TestManagerCTZeroIngestion tests scrape manager for CT cases. +func TestManagerCTZeroIngestion(t *testing.T) { + const mName = "expected_counter" + + for _, tc := range []struct { + name string + counterSample *dto.Counter + enableCTZeroIngestion bool + + expectedValues []float64 + }{ + { + name: "disabled with CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + // Timestamp does not matter as long as it exists in this test. + CreatedTimestamp: timestamppb.Now(), + }, + expectedValues: []float64{1.0}, + }, + { + name: "enabled with CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + // Timestamp does not matter as long as it exists in this test. + CreatedTimestamp: timestamppb.Now(), + }, + enableCTZeroIngestion: true, + expectedValues: []float64{0.0, 1.0}, + }, + { + name: "enabled without CT on counter", + counterSample: &dto.Counter{ + Value: proto.Float64(1.0), + }, + enableCTZeroIngestion: true, + expectedValues: []float64{1.0}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + app := &collectResultAppender{} + scrapeManager, err := NewManager( + &Options{ + EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, + skipOffsetting: true, + }, + log.NewLogfmtLogger(os.Stderr), + &collectResultAppendable{app}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Disable regular scrapes. + ScrapeInterval: model.Duration(9999 * time.Minute), + ScrapeTimeout: model.Duration(5 * time.Second), + // Ensure the proto is chosen. We need proto as it's the only protocol + // with the CT parsing support. + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + + once := sync.Once{} + // Start fake HTTP target to that allow one scrape only. + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fail := true + once.Do(func() { + fail = false + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + + ctrType := dto.MetricType_COUNTER + w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{Counter: tc.counterSample}}, + })) + }) + + if fail { + w.WriteHeader(http.StatusInternalServerError) + } + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Add fake target directly into tsets + reload. Normally users would use + // Manager.Run and wait for minimum 5s refresh interval. + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, + }) + scrapeManager.reload() + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + if countFloatSamples(app, mName) != len(tc.expectedValues) { + return fmt.Errorf("expected %v samples", tc.expectedValues) + } + return nil + }), "after 1 minute") + scrapeManager.Stop() + + require.Equal(t, tc.expectedValues, getResultFloats(app, mName)) + }) + } +} + +func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) { + a.mtx.Lock() + defer a.mtx.Unlock() + + for _, f := range a.resultFloats { + if f.metric.Get(model.MetricNameLabel) == expectedMetricName { + count++ + } + } + return count +} + +func getResultFloats(app *collectResultAppender, expectedMetricName string) (result []float64) { + app.mtx.Lock() + defer app.mtx.Unlock() + + for _, f := range app.resultFloats { + if f.metric.Get(model.MetricNameLabel) == expectedMetricName { + result = append(result, f.f) + } + } + return result +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 9a0ba1d00..be27a5d48 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -106,9 +106,10 @@ type scrapeLoopOptions struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool - mrc []*relabel.Config - cache *scrapeCache - enableCompression bool + + mrc []*relabel.Config + cache *scrapeCache + enableCompression bool } const maxAheadTime = 10 * time.Minute @@ -168,11 +169,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.interval, opts.timeout, opts.scrapeClassicHistograms, + options.EnableCreatedTimestampZeroIngestion, options.ExtraMetrics, options.EnableMetadataStorage, opts.target, options.PassMetadataInContext, metrics, + options.skipOffsetting, ) } sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) @@ -787,6 +790,7 @@ type scrapeLoop struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool + enableCTZeroIngestion bool appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator @@ -804,6 +808,8 @@ type scrapeLoop struct { appendMetadataToWAL bool metrics *scrapeMetrics + + skipOffsetting bool // For testability. } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -1076,11 +1082,13 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, scrapeClassicHistograms bool, + enableCTZeroIngestion bool, reportExtraMetrics bool, appendMetadataToWAL bool, target *Target, passMetadataInContext bool, metrics *scrapeMetrics, + skipOffsetting bool, ) *scrapeLoop { if l == nil { l = log.NewNopLogger() @@ -1124,9 +1132,11 @@ func newScrapeLoop(ctx context.Context, interval: interval, timeout: timeout, scrapeClassicHistograms: scrapeClassicHistograms, + enableCTZeroIngestion: enableCTZeroIngestion, reportExtraMetrics: reportExtraMetrics, appendMetadataToWAL: appendMetadataToWAL, metrics: metrics, + skipOffsetting: skipOffsetting, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1134,12 +1144,14 @@ func newScrapeLoop(ctx context.Context, } func (sl *scrapeLoop) run(errc chan<- error) { - select { - case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): - // Continue after a scraping offset. - case <-sl.ctx.Done(): - close(sl.stopped) - return + if !sl.skipOffsetting { + select { + case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + close(sl.stopped) + return + } } var last time.Time @@ -1557,6 +1569,15 @@ loop: updateMetadata(lset, true) } + if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. + // CT is an experimental feature. For now, we don't need to fail the + // scrape on errors updating the created timestamp, log debug. + level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + } + } + if isHistogram { if h != nil { ref, err = app.AppendHistogram(ref, lset, t, h, nil) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 8dee1f2c7..90578f2e9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -660,9 +660,11 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app false, false, false, + false, nil, false, newTestScrapeMetrics(t), + false, ) } @@ -801,9 +803,11 @@ func TestScrapeLoopRun(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, + false, ) // The loop must terminate during the initial offset if the context @@ -945,9 +949,11 @@ func TestScrapeLoopMetadata(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, + false, ) defer cancel() @@ -2377,7 +2383,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { runTest(acceptHeader(config.DefaultScrapeProtocols)) protobufParsing = true - runTest(acceptHeader(config.DefaultNativeHistogramScrapeProtocols)) + runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols)) } func TestTargetScrapeScrapeCancel(t *testing.T) { diff --git a/storage/fanout.go b/storage/fanout.go index 33257046f..a9a3f904b 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -202,6 +202,20 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada return ref, nil } +func (f *fanoutAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error) { + ref, err := f.primary.AppendCTZeroSample(ref, l, t, ct) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendCTZeroSample(ref, l, t, ct); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() diff --git a/storage/interface.go b/storage/interface.go index 2b1b6a63e..675e44c0e 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -43,6 +43,13 @@ var ( ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled") + + // ErrOutOfOrderCT indicates failed append of CT to the storage + // due to CT being older the then newer sample. + // NOTE(bwplotka): This can be both an instrumentation failure or commonly expected + // behaviour, and we currently don't have a way to determine this. As a result + // it's recommended to ignore this error for now. + ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring") ) // SeriesRef is a generic series reference. In prometheus it is either a @@ -237,6 +244,7 @@ type Appender interface { ExemplarAppender HistogramAppender MetadataUpdater + CreatedTimestampAppender } // GetRef is an extra interface on Appenders used by downstream projects @@ -294,6 +302,24 @@ type MetadataUpdater interface { UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) } +// CreatedTimestampAppender provides an interface for appending CT to storage. +type CreatedTimestampAppender interface { + // AppendCTZeroSample adds synthetic zero sample for the given ct timestamp, + // which will be associated with given series, labels and the incoming + // sample's t (timestamp). AppendCTZeroSample returns error if zero sample can't be + // appended, for example when ct is too old, or when it would collide with + // incoming sample (sample has priority). + // + // AppendCTZeroSample has to be called before the corresponding sample Append. + // A series reference number is returned which can be used to modify the + // CT for the given series in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to AppendCTZeroSample() at any point. + // + // If the reference is 0 it must not be used for caching. + AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error) +} + // SeriesSet contains a set of series. type SeriesSet interface { Next() bool diff --git a/storage/remote/write.go b/storage/remote/write.go index 237f8caa9..66455cb4d 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -303,6 +303,11 @@ func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, return 0, nil } +func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { + // AppendCTZeroSample is no-op for remote-write for now. + return 0, nil +} + // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 839009b2a..fd5b34ecd 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -339,3 +339,8 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. return 0, nil } + +func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { + // AppendCTZeroSample is no-op for remote-write for now. + return 0, nil +} diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 6afef1389..557fb7854 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -962,6 +962,11 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met return 0, nil } +func (a *appender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { + // TODO(bwplotka): Wire metadata in the Agent's appender. + return 0, nil +} + // Commit submits the collected samples and purges the batch. func (a *appender) Commit() error { if err := a.log(); err != nil { diff --git a/tsdb/head.go b/tsdb/head.go index 3ff2bee71..848357359 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -149,6 +149,10 @@ type HeadOptions struct { // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms atomic.Bool + // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. + // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md + EnableCreatedTimestampZeroIngestion bool + ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string diff --git a/tsdb/head_append.go b/tsdb/head_append.go index be53a4f3f..afb461afe 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -87,6 +87,17 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m return a.app.UpdateMetadata(ref, l, m) } +func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if a.app != nil { + return a.app.AppendCTZeroSample(ref, lset, t, ct) + } + + a.head.initTime(t) + a.app = a.head.appender() + + return a.app.AppendCTZeroSample(ref, lset, t, ct) +} + // initTime initializes a head with the first timestamp. This only needs to be called // for a completely fresh head with an empty WAL. func (h *Head) initTime(t int64) { @@ -319,28 +330,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if lset.IsEmpty() { - return 0, errors.Wrap(ErrInvalidSample, "empty labelset") - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) - } - - var created bool var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, err = a.getOrCreate(lset) if err != nil { return 0, err } - if created { - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } } if value.IsStaleNaN(v) { @@ -389,6 +383,71 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 return storage.SeriesRef(s.ref), nil } +// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns +// error when sample can't be appended. See +// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation. +func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if ct >= t { + return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring") + } + + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + var err error + s, err = a.getOrCreate(lset) + if err != nil { + return 0, err + } + } + + // Check if CT wouldn't be OOO vs samples we already might have for this series. + // NOTE(bwplotka): This will be often hit as it's expected for long living + // counters to share the same CT. + s.Lock() + isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err == nil { + s.pendingCommit = true + } + s.Unlock() + if err != nil { + return 0, err + } + if isOOO { + return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT + } + + if ct > a.maxt { + a.maxt = ct + } + a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) + a.sampleSeries = append(a.sampleSeries, s) + return storage.SeriesRef(s.ref), nil +} + +func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if lset.IsEmpty() { + return nil, errors.Wrap(ErrInvalidSample, "empty labelset") + } + if l, dup := lset.HasDuplicateLabelNames(); dup { + return nil, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } + var created bool + var err error + s, created, err := a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return nil, err + } + if created { + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } + return s, nil +} + // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) // The sample belongs to the out of order chunk if we return true and no error. // An error signifies the sample cannot be handled. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d444e1496..535647d3a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -5641,3 +5642,93 @@ func TestPostingsCardinalityStats(t *testing.T) { // Using cache. require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1)) } + +func TestHeadAppender_AppendCTZeroSample(t *testing.T) { + type appendableSamples struct { + ts int64 + val float64 + ct int64 + } + for _, tc := range []struct { + name string + appendableSamples []appendableSamples + expectedSamples []model.Sample + }{ + { + name: "In order ct+normal sample", + appendableSamples: []appendableSamples{ + {ts: 100, val: 10, ct: 1}, + }, + expectedSamples: []model.Sample{ + {Timestamp: 1, Value: 0}, + {Timestamp: 100, Value: 10}, + }, + }, + { + name: "Consecutive appends with same ct ignore ct", + appendableSamples: []appendableSamples{ + {ts: 100, val: 10, ct: 1}, + {ts: 101, val: 10, ct: 1}, + }, + expectedSamples: []model.Sample{ + {Timestamp: 1, Value: 0}, + {Timestamp: 100, Value: 10}, + {Timestamp: 101, Value: 10}, + }, + }, + { + name: "Consecutive appends with newer ct do not ignore ct", + appendableSamples: []appendableSamples{ + {ts: 100, val: 10, ct: 1}, + {ts: 102, val: 10, ct: 101}, + }, + expectedSamples: []model.Sample{ + {Timestamp: 1, Value: 0}, + {Timestamp: 100, Value: 10}, + {Timestamp: 101, Value: 0}, + {Timestamp: 102, Value: 10}, + }, + }, + { + name: "CT equals to previous sample timestamp is ignored", + appendableSamples: []appendableSamples{ + {ts: 100, val: 10, ct: 1}, + {ts: 101, val: 10, ct: 100}, + }, + expectedSamples: []model.Sample{ + {Timestamp: 1, Value: 0}, + {Timestamp: 100, Value: 10}, + {Timestamp: 101, Value: 10}, + }, + }, + } { + h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, h.Close()) + }() + a := h.Appender(context.Background()) + lbls := labels.FromStrings("foo", "bar") + for _, sample := range tc.appendableSamples { + _, err := a.AppendCTZeroSample(0, lbls, sample.ts, sample.ct) + require.NoError(t, err) + _, err = a.Append(0, lbls, sample.ts, sample.val) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + it := s.Iterator(nil) + for _, sample := range tc.expectedSamples { + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value := it.At() + require.Equal(t, sample.Timestamp, model.Time(timestamp)) + require.Equal(t, sample.Value, model.SampleValue(value)) + } + require.Equal(t, chunkenc.ValNone, it.Next()) + } +} diff --git a/util/runutil/runutil.go b/util/runutil/runutil.go new file mode 100644 index 000000000..5a77c332b --- /dev/null +++ b/util/runutil/runutil.go @@ -0,0 +1,37 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copied from https://github.com/efficientgo/core/blob/a21078e2c723b69e05f95c65dbc5058712b4edd8/runutil/runutil.go#L39 +// and adjusted. + +package runutil + +import "time" + +// Retry executes f every interval seconds until timeout or no error is returned from f. +func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { + tick := time.NewTicker(interval) + defer tick.Stop() + + var err error + for { + if err = f(); err == nil { + return nil + } + select { + case <-stopc: + return err + case <-tick.C: + } + } +}