Append Created Timestamps (#12733)

* Append created timestamps.

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>

* Log when created timestamps are ignored

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>

* Proposed changes to Append CT PR.

Changes:

* Changed textparse Parser interface for consistency and robustness.
* Changed CT interface to be more explicit and handle validation.
* Simplified test, change scrapeManager to allow testability.
* Added TODOs.

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Updates.

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Refactor head_appender test

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>

* Fix linter issues

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>

* Use model.Sample in head appender test

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>

---------

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
Arthur Silva Sens 2023-12-11 05:43:42 -03:00 committed by GitHub
parent db915b07cb
commit 5082655392
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 578 additions and 74 deletions

View file

@ -206,9 +206,15 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "native-histograms": case "native-histograms":
c.tsdb.EnableNativeHistograms = true c.tsdb.EnableNativeHistograms = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "created-timestamp-zero-ingestion":
c.scrape.EnableCreatedTimestampZeroIngestion = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "": case "":
continue continue
case "promql-at-modifier", "promql-negative-offset": case "promql-at-modifier", "promql-negative-offset":
@ -1449,6 +1455,10 @@ func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels,
return 0, tsdb.ErrNotReady return 0, tsdb.ErrNotReady
} }
func (n notReadyAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }

View file

@ -454,12 +454,19 @@ var (
OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0", OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0",
} }
// DefaultScrapeProtocols is the set of scrape protocols that will be proposed
// to scrape target, ordered by priority.
DefaultScrapeProtocols = []ScrapeProtocol{ DefaultScrapeProtocols = []ScrapeProtocol{
OpenMetricsText1_0_0, OpenMetricsText1_0_0,
OpenMetricsText0_0_1, OpenMetricsText0_0_1,
PrometheusText0_0_4, PrometheusText0_0_4,
} }
DefaultNativeHistogramScrapeProtocols = []ScrapeProtocol{
// DefaultProtoFirstScrapeProtocols is like DefaultScrapeProtocols, but it
// favors protobuf Prometheus exposition format.
// Used by default for certain feature-flags like
// "native-histograms" and "created-timestamp-zero-ingestion".
DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{
PrometheusProto, PrometheusProto,
OpenMetricsText1_0_0, OpenMetricsText1_0_0,
OpenMetricsText0_0_1, OpenMetricsText0_0_1,

View file

@ -16,8 +16,6 @@ package textparse
import ( import (
"mime" "mime"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -66,10 +64,10 @@ type Parser interface {
// retrieved (including the case where no exemplars exist at all). // retrieved (including the case where no exemplars exist at all).
Exemplar(l *exemplar.Exemplar) bool Exemplar(l *exemplar.Exemplar) bool
// CreatedTimestamp writes the created timestamp of the current sample // CreatedTimestamp returns the created timestamp (in milliseconds) for the
// into the passed timestamp. It returns false if no created timestamp // current sample. It returns nil if it is unknown e.g. if it wasn't set,
// exists or if the metric type does not support created timestamps. // if the scrape protocol or metric type does not support created timestamps.
CreatedTimestamp(ct *types.Timestamp) bool CreatedTimestamp() *int64
// Next advances the parser to the next sample. It returns false if no // Next advances the parser to the next sample. It returns false if no
// more samples were read or an error occurred. // more samples were read or an error occurred.

View file

@ -24,8 +24,6 @@ import (
"strings" "strings"
"unicode/utf8" "unicode/utf8"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -213,9 +211,10 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
return true return true
} }
// CreatedTimestamp returns false because OpenMetricsParser does not support created timestamps (yet). // CreatedTimestamp returns nil as it's not implemented yet.
func (p *OpenMetricsParser) CreatedTimestamp(_ *types.Timestamp) bool { // TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
return false func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
return nil
} }
// nextToken returns the next token from the openMetricsLexer. // nextToken returns the next token from the openMetricsLexer.

View file

@ -26,8 +26,6 @@ import (
"unicode/utf8" "unicode/utf8"
"unsafe" "unsafe"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -247,9 +245,10 @@ func (p *PromParser) Exemplar(*exemplar.Exemplar) bool {
return false return false
} }
// CreatedTimestamp returns false because PromParser does not support created timestamps. // CreatedTimestamp returns nil as it's not implemented yet.
func (p *PromParser) CreatedTimestamp(_ *types.Timestamp) bool { // TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
return false func (p *PromParser) CreatedTimestamp() *int64 {
return nil
} }
// nextToken returns the next token from the promlexer. It skips over tabs // nextToken returns the next token from the promlexer. It skips over tabs

View file

@ -360,22 +360,26 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
return true return true
} }
func (p *ProtobufParser) CreatedTimestamp(ct *types.Timestamp) bool { // CreatedTimestamp returns CT or nil if CT is not present or
var foundCT *types.Timestamp // invalid (as timestamp e.g. negative value) on counters, summaries or histograms.
func (p *ProtobufParser) CreatedTimestamp() *int64 {
var ct *types.Timestamp
switch p.mf.GetType() { switch p.mf.GetType() {
case dto.MetricType_COUNTER: case dto.MetricType_COUNTER:
foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp() ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
case dto.MetricType_SUMMARY: case dto.MetricType_SUMMARY:
foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp() ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp() ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
default: default:
} }
if foundCT == nil { ctAsTime, err := types.TimestampFromProto(ct)
return false if err != nil {
// Errors means ct == nil or invalid timestamp, which we silently ignore.
return nil
} }
*ct = *foundCT ctMilis := ctAsTime.UnixMilli()
return true return &ctMilis
} }
// Next advances the parser to the next "sample" (emulating the behavior of a // Next advances the parser to the next "sample" (emulating the behavior of a

View file

@ -21,7 +21,6 @@ import (
"testing" "testing"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
@ -630,7 +629,7 @@ func TestProtobufParse(t *testing.T) {
shs *histogram.Histogram shs *histogram.Histogram
fhs *histogram.FloatHistogram fhs *histogram.FloatHistogram
e []exemplar.Exemplar e []exemplar.Exemplar
ct *types.Timestamp ct int64
} }
inputBuf := createTestProtoBuf(t) inputBuf := createTestProtoBuf(t)
@ -1069,7 +1068,7 @@ func TestProtobufParse(t *testing.T) {
{ {
m: "test_counter_with_createdtimestamp", m: "test_counter_with_createdtimestamp",
v: 42, v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_counter_with_createdtimestamp", "__name__", "test_counter_with_createdtimestamp",
), ),
@ -1085,7 +1084,7 @@ func TestProtobufParse(t *testing.T) {
{ {
m: "test_summary_with_createdtimestamp_count", m: "test_summary_with_createdtimestamp_count",
v: 42, v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_count", "__name__", "test_summary_with_createdtimestamp_count",
), ),
@ -1093,7 +1092,7 @@ func TestProtobufParse(t *testing.T) {
{ {
m: "test_summary_with_createdtimestamp_sum", m: "test_summary_with_createdtimestamp_sum",
v: 1.234, v: 1.234,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_sum", "__name__", "test_summary_with_createdtimestamp_sum",
), ),
@ -1108,7 +1107,7 @@ func TestProtobufParse(t *testing.T) {
}, },
{ {
m: "test_histogram_with_createdtimestamp", m: "test_histogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
shs: &histogram.Histogram{ shs: &histogram.Histogram{
CounterResetHint: histogram.UnknownCounterReset, CounterResetHint: histogram.UnknownCounterReset,
PositiveSpans: []histogram.Span{}, PositiveSpans: []histogram.Span{},
@ -1128,7 +1127,7 @@ func TestProtobufParse(t *testing.T) {
}, },
{ {
m: "test_gaugehistogram_with_createdtimestamp", m: "test_gaugehistogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
shs: &histogram.Histogram{ shs: &histogram.Histogram{
CounterResetHint: histogram.GaugeType, CounterResetHint: histogram.GaugeType,
PositiveSpans: []histogram.Span{}, PositiveSpans: []histogram.Span{},
@ -1887,7 +1886,7 @@ func TestProtobufParse(t *testing.T) {
{ // 83 { // 83
m: "test_counter_with_createdtimestamp", m: "test_counter_with_createdtimestamp",
v: 42, v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_counter_with_createdtimestamp", "__name__", "test_counter_with_createdtimestamp",
), ),
@ -1903,7 +1902,7 @@ func TestProtobufParse(t *testing.T) {
{ // 86 { // 86
m: "test_summary_with_createdtimestamp_count", m: "test_summary_with_createdtimestamp_count",
v: 42, v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_count", "__name__", "test_summary_with_createdtimestamp_count",
), ),
@ -1911,7 +1910,7 @@ func TestProtobufParse(t *testing.T) {
{ // 87 { // 87
m: "test_summary_with_createdtimestamp_sum", m: "test_summary_with_createdtimestamp_sum",
v: 1.234, v: 1.234,
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_sum", "__name__", "test_summary_with_createdtimestamp_sum",
), ),
@ -1926,7 +1925,7 @@ func TestProtobufParse(t *testing.T) {
}, },
{ // 90 { // 90
m: "test_histogram_with_createdtimestamp", m: "test_histogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
shs: &histogram.Histogram{ shs: &histogram.Histogram{
CounterResetHint: histogram.UnknownCounterReset, CounterResetHint: histogram.UnknownCounterReset,
PositiveSpans: []histogram.Span{}, PositiveSpans: []histogram.Span{},
@ -1946,7 +1945,7 @@ func TestProtobufParse(t *testing.T) {
}, },
{ // 93 { // 93
m: "test_gaugehistogram_with_createdtimestamp", m: "test_gaugehistogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1}, ct: 1000,
shs: &histogram.Histogram{ shs: &histogram.Histogram{
CounterResetHint: histogram.GaugeType, CounterResetHint: histogram.GaugeType,
PositiveSpans: []histogram.Span{}, PositiveSpans: []histogram.Span{},
@ -1981,10 +1980,9 @@ func TestProtobufParse(t *testing.T) {
m, ts, v := p.Series() m, ts, v := p.Series()
var e exemplar.Exemplar var e exemplar.Exemplar
var ct types.Timestamp
p.Metric(&res) p.Metric(&res)
eFound := p.Exemplar(&e) eFound := p.Exemplar(&e)
ctFound := p.CreatedTimestamp(&ct) ct := p.CreatedTimestamp()
require.Equal(t, exp[i].m, string(m), "i: %d", i) require.Equal(t, exp[i].m, string(m), "i: %d", i)
if ts != nil { if ts != nil {
require.Equal(t, exp[i].t, *ts, "i: %d", i) require.Equal(t, exp[i].t, *ts, "i: %d", i)
@ -2000,11 +1998,11 @@ func TestProtobufParse(t *testing.T) {
require.Equal(t, exp[i].e[0], e, "i: %d", i) require.Equal(t, exp[i].e[0], e, "i: %d", i)
require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i) require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i)
} }
if exp[i].ct != nil { if exp[i].ct != 0 {
require.True(t, ctFound, "i: %d", i) require.NotNilf(t, ct, "i: %d", i)
require.Equal(t, exp[i].ct.String(), ct.String(), "i: %d", i) require.Equal(t, exp[i].ct, *ct, "i: %d", i)
} else { } else {
require.False(t, ctFound, "i: %d", i) require.Nilf(t, ct, "i: %d", i)
} }
case EntryHistogram: case EntryHistogram:

View file

@ -14,10 +14,18 @@
package scrape package scrape
import ( import (
"bytes"
"context" "context"
"encoding/binary"
"fmt" "fmt"
"math/rand" "math/rand"
"strings" "strings"
"sync"
"testing"
"github.com/gogo/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
@ -50,6 +58,10 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M
return 0, nil return 0, nil
} }
func (a nopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) Commit() error { return nil } func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil } func (a nopAppender) Rollback() error { return nil }
@ -65,9 +77,19 @@ type histogramSample struct {
fh *histogram.FloatHistogram fh *histogram.FloatHistogram
} }
type collectResultAppendable struct {
*collectResultAppender
}
func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender {
return a
}
// collectResultAppender records all samples that were added through the appender. // collectResultAppender records all samples that were added through the appender.
// It can be used as its zero value or be backed by another appender it writes samples through. // It can be used as its zero value or be backed by another appender it writes samples through.
type collectResultAppender struct { type collectResultAppender struct {
mtx sync.Mutex
next storage.Appender next storage.Appender
resultFloats []floatSample resultFloats []floatSample
pendingFloats []floatSample pendingFloats []floatSample
@ -82,6 +104,8 @@ type collectResultAppender struct {
} }
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{ a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset, metric: lset,
t: t, t: t,
@ -103,6 +127,8 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
} }
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingExemplars = append(a.pendingExemplars, e) a.pendingExemplars = append(a.pendingExemplars, e)
if a.next == nil { if a.next == nil {
return 0, nil return 0, nil
@ -112,6 +138,8 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
} }
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
if a.next == nil { if a.next == nil {
return 0, nil return 0, nil
@ -121,6 +149,8 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
} }
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingMetadata = append(a.pendingMetadata, m) a.pendingMetadata = append(a.pendingMetadata, m)
if ref == 0 { if ref == 0 {
ref = storage.SeriesRef(rand.Uint64()) ref = storage.SeriesRef(rand.Uint64())
@ -132,7 +162,13 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
return a.next.UpdateMetadata(ref, l, m) return a.next.UpdateMetadata(ref, l, m)
} }
func (a *collectResultAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
return a.Append(ref, l, ct, 0.0)
}
func (a *collectResultAppender) Commit() error { func (a *collectResultAppender) Commit() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.resultFloats = append(a.resultFloats, a.pendingFloats...) a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
@ -148,6 +184,8 @@ func (a *collectResultAppender) Commit() error {
} }
func (a *collectResultAppender) Rollback() error { func (a *collectResultAppender) Rollback() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.rolledbackFloats = a.pendingFloats a.rolledbackFloats = a.pendingFloats
a.rolledbackHistograms = a.pendingHistograms a.rolledbackHistograms = a.pendingHistograms
a.pendingFloats = nil a.pendingFloats = nil
@ -171,3 +209,22 @@ func (a *collectResultAppender) String() string {
} }
return sb.String() return sb.String()
} }
// protoMarshalDelimited marshals a MetricFamily into a delimited
// Prometheus proto exposition format bytes (known as 'encoding=delimited`)
//
// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte {
t.Helper()
protoBuf, err := proto.Marshal(mf)
require.NoError(t, err)
varintBuf := make([]byte, binary.MaxVarintLen32)
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf := &bytes.Buffer{}
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
return buf.Bytes()
}

View file

@ -78,9 +78,15 @@ type Options struct {
EnableMetadataStorage bool EnableMetadataStorage bool
// Option to increase the interval used by scrape manager to throttle target groups updates. // Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration DiscoveryReloadInterval model.Duration
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
// Optional HTTP client options to use when scraping. // Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption HTTPClientOptions []config_util.HTTPClientOption
// private option for testability.
skipOffsetting bool
} }
// Manager maintains a set of scrape pools and manages start/stop cycles // Manager maintains a set of scrape pools and manages start/stop cycles

View file

@ -15,14 +15,23 @@ package scrape
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"net/http/httptest"
"net/url"
"os"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -30,6 +39,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/util/runutil"
) )
func TestPopulateLabels(t *testing.T) { func TestPopulateLabels(t *testing.T) {
@ -714,3 +724,146 @@ scrape_configs:
reload(scrapeManager, cfg2) reload(scrapeManager, cfg2)
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
} }
// TestManagerCTZeroIngestion tests scrape manager for CT cases.
func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter"
for _, tc := range []struct {
name string
counterSample *dto.Counter
enableCTZeroIngestion bool
expectedValues []float64
}{
{
name: "disabled with CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
expectedValues: []float64{1.0},
},
{
name: "enabled with CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
expectedValues: []float64{0.0, 1.0},
},
{
name: "enabled without CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
},
enableCTZeroIngestion: true,
expectedValues: []float64{1.0},
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
ctrType := dto.MetricType_COUNTER
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSample}},
}))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
if countFloatSamples(app, mName) != len(tc.expectedValues) {
return fmt.Errorf("expected %v samples", tc.expectedValues)
}
return nil
}), "after 1 minute")
scrapeManager.Stop()
require.Equal(t, tc.expectedValues, getResultFloats(app, mName))
})
}
}
func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, f := range a.resultFloats {
if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
count++
}
}
return count
}
func getResultFloats(app *collectResultAppender, expectedMetricName string) (result []float64) {
app.mtx.Lock()
defer app.mtx.Unlock()
for _, f := range app.resultFloats {
if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
result = append(result, f.f)
}
}
return result
}

View file

@ -106,9 +106,10 @@ type scrapeLoopOptions struct {
interval time.Duration interval time.Duration
timeout time.Duration timeout time.Duration
scrapeClassicHistograms bool scrapeClassicHistograms bool
mrc []*relabel.Config
cache *scrapeCache mrc []*relabel.Config
enableCompression bool cache *scrapeCache
enableCompression bool
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -168,11 +169,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.interval, opts.interval,
opts.timeout, opts.timeout,
opts.scrapeClassicHistograms, opts.scrapeClassicHistograms,
options.EnableCreatedTimestampZeroIngestion,
options.ExtraMetrics, options.ExtraMetrics,
options.EnableMetadataStorage, options.EnableMetadataStorage,
opts.target, opts.target,
options.PassMetadataInContext, options.PassMetadataInContext,
metrics, metrics,
options.skipOffsetting,
) )
} }
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
@ -787,6 +790,7 @@ type scrapeLoop struct {
interval time.Duration interval time.Duration
timeout time.Duration timeout time.Duration
scrapeClassicHistograms bool scrapeClassicHistograms bool
enableCTZeroIngestion bool
appender func(ctx context.Context) storage.Appender appender func(ctx context.Context) storage.Appender
sampleMutator labelsMutator sampleMutator labelsMutator
@ -804,6 +808,8 @@ type scrapeLoop struct {
appendMetadataToWAL bool appendMetadataToWAL bool
metrics *scrapeMetrics metrics *scrapeMetrics
skipOffsetting bool // For testability.
} }
// scrapeCache tracks mappings of exposed metric strings to label sets and // scrapeCache tracks mappings of exposed metric strings to label sets and
@ -1076,11 +1082,13 @@ func newScrapeLoop(ctx context.Context,
interval time.Duration, interval time.Duration,
timeout time.Duration, timeout time.Duration,
scrapeClassicHistograms bool, scrapeClassicHistograms bool,
enableCTZeroIngestion bool,
reportExtraMetrics bool, reportExtraMetrics bool,
appendMetadataToWAL bool, appendMetadataToWAL bool,
target *Target, target *Target,
passMetadataInContext bool, passMetadataInContext bool,
metrics *scrapeMetrics, metrics *scrapeMetrics,
skipOffsetting bool,
) *scrapeLoop { ) *scrapeLoop {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
@ -1124,9 +1132,11 @@ func newScrapeLoop(ctx context.Context,
interval: interval, interval: interval,
timeout: timeout, timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms, scrapeClassicHistograms: scrapeClassicHistograms,
enableCTZeroIngestion: enableCTZeroIngestion,
reportExtraMetrics: reportExtraMetrics, reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL, appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics, metrics: metrics,
skipOffsetting: skipOffsetting,
} }
sl.ctx, sl.cancel = context.WithCancel(ctx) sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1134,12 +1144,14 @@ func newScrapeLoop(ctx context.Context,
} }
func (sl *scrapeLoop) run(errc chan<- error) { func (sl *scrapeLoop) run(errc chan<- error) {
select { if !sl.skipOffsetting {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)): select {
// Continue after a scraping offset. case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
case <-sl.ctx.Done(): // Continue after a scraping offset.
close(sl.stopped) case <-sl.ctx.Done():
return close(sl.stopped)
return
}
} }
var last time.Time var last time.Time
@ -1557,6 +1569,15 @@ loop:
updateMetadata(lset, true) updateMetadata(lset, true)
} }
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}
if isHistogram { if isHistogram {
if h != nil { if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil) ref, err = app.AppendHistogram(ref, lset, t, h, nil)

View file

@ -660,9 +660,11 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
false, false,
false, false,
false, false,
false,
nil, nil,
false, false,
newTestScrapeMetrics(t), newTestScrapeMetrics(t),
false,
) )
} }
@ -801,9 +803,11 @@ func TestScrapeLoopRun(t *testing.T) {
false, false,
false, false,
false, false,
false,
nil, nil,
false, false,
scrapeMetrics, scrapeMetrics,
false,
) )
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
@ -945,9 +949,11 @@ func TestScrapeLoopMetadata(t *testing.T) {
false, false,
false, false,
false, false,
false,
nil, nil,
false, false,
scrapeMetrics, scrapeMetrics,
false,
) )
defer cancel() defer cancel()
@ -2377,7 +2383,7 @@ func TestTargetScraperScrapeOK(t *testing.T) {
runTest(acceptHeader(config.DefaultScrapeProtocols)) runTest(acceptHeader(config.DefaultScrapeProtocols))
protobufParsing = true protobufParsing = true
runTest(acceptHeader(config.DefaultNativeHistogramScrapeProtocols)) runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols))
} }
func TestTargetScrapeScrapeCancel(t *testing.T) { func TestTargetScrapeScrapeCancel(t *testing.T) {

View file

@ -202,6 +202,20 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada
return ref, nil return ref, nil
} }
func (f *fanoutAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error) {
ref, err := f.primary.AppendCTZeroSample(ref, l, t, ct)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendCTZeroSample(ref, l, t, ct); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) Commit() (err error) { func (f *fanoutAppender) Commit() (err error) {
err = f.primary.Commit() err = f.primary.Commit()

View file

@ -43,6 +43,13 @@ var (
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled") ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled")
// ErrOutOfOrderCT indicates failed append of CT to the storage
// due to CT being older the then newer sample.
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now.
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
) )
// SeriesRef is a generic series reference. In prometheus it is either a // SeriesRef is a generic series reference. In prometheus it is either a
@ -237,6 +244,7 @@ type Appender interface {
ExemplarAppender ExemplarAppender
HistogramAppender HistogramAppender
MetadataUpdater MetadataUpdater
CreatedTimestampAppender
} }
// GetRef is an extra interface on Appenders used by downstream projects // GetRef is an extra interface on Appenders used by downstream projects
@ -294,6 +302,24 @@ type MetadataUpdater interface {
UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error)
} }
// CreatedTimestampAppender provides an interface for appending CT to storage.
type CreatedTimestampAppender interface {
// AppendCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming
// sample's t (timestamp). AppendCTZeroSample returns error if zero sample can't be
// appended, for example when ct is too old, or when it would collide with
// incoming sample (sample has priority).
//
// AppendCTZeroSample has to be called before the corresponding sample Append.
// A series reference number is returned which can be used to modify the
// CT for the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AppendCTZeroSample() at any point.
//
// If the reference is 0 it must not be used for caching.
AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error)
}
// SeriesSet contains a set of series. // SeriesSet contains a set of series.
type SeriesSet interface { type SeriesSet interface {
Next() bool Next() bool

View file

@ -303,6 +303,11 @@ func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels,
return 0, nil return 0, nil
} }
func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now.
return 0, nil
}
// Commit implements storage.Appender. // Commit implements storage.Appender.
func (t *timestampTracker) Commit() error { func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)

View file

@ -339,3 +339,8 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _
// UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now.
return 0, nil return 0, nil
} }
func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now.
return 0, nil
}

View file

@ -962,6 +962,11 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met
return 0, nil return 0, nil
} }
func (a *appender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
// TODO(bwplotka): Wire metadata in the Agent's appender.
return 0, nil
}
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error { func (a *appender) Commit() error {
if err := a.log(); err != nil { if err := a.log(); err != nil {

View file

@ -149,6 +149,10 @@ type HeadOptions struct {
// EnableNativeHistograms enables the ingestion of native histograms. // EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms atomic.Bool EnableNativeHistograms atomic.Bool
// EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
ChunkRange int64 ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory. // ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string ChunkDirRoot string

View file

@ -87,6 +87,17 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m
return a.app.UpdateMetadata(ref, l, m) return a.app.UpdateMetadata(ref, l, m)
} }
func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendCTZeroSample(ref, lset, t, ct)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendCTZeroSample(ref, lset, t, ct)
}
// initTime initializes a head with the first timestamp. This only needs to be called // initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL. // for a completely fresh head with an empty WAL.
func (h *Head) initTime(t int64) { func (h *Head) initTime(t int64) {
@ -319,28 +330,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil { if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
var created bool
var err error var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset) s, err = a.getOrCreate(lset)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
} }
if value.IsStaleNaN(v) { if value.IsStaleNaN(v) {
@ -389,6 +383,71 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
}
// Check if CT wouldn't be OOO vs samples we already might have for this series.
// NOTE(bwplotka): This will be often hit as it's expected for long living
// counters to share the same CT.
s.Lock()
isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if err != nil {
return 0, err
}
if isOOO {
return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT
}
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return nil, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return nil, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
var created bool
var err error
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
if err != nil {
return nil, err
}
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
return s, nil
}
// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error. // The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled. // An error signifies the sample cannot be handled.

View file

@ -33,6 +33,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic" "go.uber.org/atomic"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -5641,3 +5642,93 @@ func TestPostingsCardinalityStats(t *testing.T) {
// Using cache. // Using cache.
require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1)) require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1))
} }
func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
type appendableSamples struct {
ts int64
val float64
ct int64
}
for _, tc := range []struct {
name string
appendableSamples []appendableSamples
expectedSamples []model.Sample
}{
{
name: "In order ct+normal sample",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
},
},
{
name: "Consecutive appends with same ct ignore ct",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 1},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
},
},
{
name: "Consecutive appends with newer ct do not ignore ct",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 102, val: 10, ct: 101},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 0},
{Timestamp: 102, Value: 10},
},
},
{
name: "CT equals to previous sample timestamp is ignored",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 100},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
},
},
} {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := a.AppendCTZeroSample(0, lbls, sample.ts, sample.ct)
require.NoError(t, err)
_, err = a.Append(0, lbls, sample.ts, sample.val)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator(nil)
for _, sample := range tc.expectedSamples {
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, sample.Timestamp, model.Time(timestamp))
require.Equal(t, sample.Value, model.SampleValue(value))
}
require.Equal(t, chunkenc.ValNone, it.Next())
}
}

37
util/runutil/runutil.go Normal file
View file

@ -0,0 +1,37 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copied from https://github.com/efficientgo/core/blob/a21078e2c723b69e05f95c65dbc5058712b4edd8/runutil/runutil.go#L39
// and adjusted.
package runutil
import "time"
// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
tick := time.NewTicker(interval)
defer tick.Stop()
var err error
for {
if err = f(); err == nil {
return nil
}
select {
case <-stopc:
return err
case <-tick.C:
}
}
}