mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-19 23:37:31 -07:00
Histogram CT Zero ingestion
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
parent
00d23c9689
commit
f295e9dc51
|
@ -1599,6 +1599,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels
|
||||||
return 0, tsdb.ErrNotReady
|
return 0, tsdb.ErrNotReady
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n notReadyAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
return 0, tsdb.ErrNotReady
|
||||||
|
}
|
||||||
|
|
||||||
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
return 0, tsdb.ErrNotReady
|
return 0, tsdb.ErrNotReady
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
|
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -78,9 +82,10 @@ func equalFloatSamples(a, b floatSample) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type histogramSample struct {
|
type histogramSample struct {
|
||||||
t int64
|
metric labels.Labels
|
||||||
h *histogram.Histogram
|
t int64
|
||||||
fh *histogram.FloatHistogram
|
h *histogram.Histogram
|
||||||
|
fh *histogram.FloatHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
type collectResultAppendable struct {
|
type collectResultAppendable struct {
|
||||||
|
@ -146,7 +151,7 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
|
||||||
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
a.mtx.Lock()
|
a.mtx.Lock()
|
||||||
defer a.mtx.Unlock()
|
defer a.mtx.Unlock()
|
||||||
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
|
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
|
||||||
if a.next == nil {
|
if a.next == nil {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -154,6 +159,13 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
|
||||||
return a.next.AppendHistogram(ref, l, t, h, fh)
|
return a.next.AppendHistogram(ref, l, t, h, fh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
if h != nil {
|
||||||
|
return a.AppendHistogram(ref, l, ct, &histogram.Histogram{}, nil)
|
||||||
|
}
|
||||||
|
return a.AppendHistogram(ref, l, ct, nil, &histogram.FloatHistogram{})
|
||||||
|
}
|
||||||
|
|
||||||
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
a.mtx.Lock()
|
a.mtx.Lock()
|
||||||
defer a.mtx.Unlock()
|
defer a.mtx.Unlock()
|
||||||
|
|
|
@ -39,8 +39,10 @@ import (
|
||||||
"github.com/prometheus/prometheus/discovery"
|
"github.com/prometheus/prometheus/discovery"
|
||||||
_ "github.com/prometheus/prometheus/discovery/file"
|
_ "github.com/prometheus/prometheus/discovery/file"
|
||||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/model/relabel"
|
"github.com/prometheus/prometheus/model/relabel"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/util/runutil"
|
"github.com/prometheus/prometheus/util/runutil"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
@ -858,6 +860,178 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram,
|
||||||
|
// but in the form of dto.Histogram.
|
||||||
|
func generateTestHistogram(i int) *dto.Histogram {
|
||||||
|
helper := tsdbutil.GenerateTestHistogram(i)
|
||||||
|
h := &dto.Histogram{}
|
||||||
|
h.SampleCount = proto.Uint64(helper.Count)
|
||||||
|
h.SampleSum = proto.Float64(helper.Sum)
|
||||||
|
h.Schema = proto.Int32(helper.Schema)
|
||||||
|
h.ZeroThreshold = proto.Float64(helper.ZeroThreshold)
|
||||||
|
h.ZeroCount = proto.Uint64(helper.ZeroCount)
|
||||||
|
h.PositiveSpan = make([]*dto.BucketSpan, len(helper.PositiveSpans))
|
||||||
|
for i, span := range helper.PositiveSpans {
|
||||||
|
h.PositiveSpan[i] = &dto.BucketSpan{
|
||||||
|
Offset: proto.Int32(span.Offset),
|
||||||
|
Length: proto.Uint32(span.Length),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.PositiveDelta = helper.PositiveBuckets
|
||||||
|
h.NegativeSpan = make([]*dto.BucketSpan, len(helper.NegativeSpans))
|
||||||
|
for i, span := range helper.NegativeSpans {
|
||||||
|
h.NegativeSpan[i] = &dto.BucketSpan{
|
||||||
|
Offset: proto.Int32(span.Offset),
|
||||||
|
Length: proto.Uint32(span.Length),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.NegativeDelta = helper.NegativeBuckets
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManagerCTZeroIngestionHistogram(t *testing.T) {
|
||||||
|
const mName = "expected_histogram"
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
inputHistSample *dto.Histogram
|
||||||
|
enableCTZeroIngestion bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "disabled with CT on histogram",
|
||||||
|
inputHistSample: func() *dto.Histogram {
|
||||||
|
h := generateTestHistogram(0)
|
||||||
|
h.CreatedTimestamp = timestamppb.Now()
|
||||||
|
return h
|
||||||
|
}(),
|
||||||
|
enableCTZeroIngestion: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "enabled with CT on histogram",
|
||||||
|
inputHistSample: func() *dto.Histogram {
|
||||||
|
h := generateTestHistogram(0)
|
||||||
|
h.CreatedTimestamp = timestamppb.Now()
|
||||||
|
return h
|
||||||
|
}(),
|
||||||
|
enableCTZeroIngestion: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "enabled without CT on histogram",
|
||||||
|
inputHistSample: func() *dto.Histogram {
|
||||||
|
h := generateTestHistogram(0)
|
||||||
|
return h
|
||||||
|
}(),
|
||||||
|
enableCTZeroIngestion: true,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
app := &collectResultAppender{}
|
||||||
|
scrapeManager, err := NewManager(
|
||||||
|
&Options{
|
||||||
|
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
|
||||||
|
EnableNativeHistogramsIngestion: true,
|
||||||
|
skipOffsetting: true,
|
||||||
|
},
|
||||||
|
log.NewLogfmtLogger(os.Stderr),
|
||||||
|
nil,
|
||||||
|
&collectResultAppendable{app},
|
||||||
|
prometheus.NewRegistry(),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
||||||
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
// Disable regular scrapes.
|
||||||
|
ScrapeInterval: model.Duration(9999 * time.Minute),
|
||||||
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
||||||
|
// Ensure the proto is chosen. We need proto as it's the only protocol
|
||||||
|
// with the CT parsing support.
|
||||||
|
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
||||||
|
},
|
||||||
|
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
once := sync.Once{}
|
||||||
|
// Start fake HTTP target to that allow one scrape only.
|
||||||
|
server := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fail := true // TODO(bwplotka): Kill or use?
|
||||||
|
once.Do(func() {
|
||||||
|
fail = false
|
||||||
|
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
|
||||||
|
|
||||||
|
ctrType := dto.MetricType_HISTOGRAM
|
||||||
|
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
|
||||||
|
Name: proto.String(mName),
|
||||||
|
Type: &ctrType,
|
||||||
|
Metric: []*dto.Metric{{Histogram: tc.inputHistSample}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
if fail {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
serverURL, err := url.Parse(server.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Add fake target directly into tsets + reload. Normally users would use
|
||||||
|
// Manager.Run and wait for minimum 5s refresh interval.
|
||||||
|
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
||||||
|
"test": {{
|
||||||
|
Targets: []model.LabelSet{{
|
||||||
|
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
|
||||||
|
model.AddressLabel: model.LabelValue(serverURL.Host),
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
scrapeManager.reload()
|
||||||
|
|
||||||
|
var got []histogramSample
|
||||||
|
|
||||||
|
// Wait for one scrape.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||||
|
app.mtx.Lock()
|
||||||
|
defer app.mtx.Unlock()
|
||||||
|
|
||||||
|
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
|
||||||
|
// and it's not worth waiting.
|
||||||
|
for _, h := range app.resultHistograms {
|
||||||
|
if h.metric.Get(model.MetricNameLabel) == mName {
|
||||||
|
got = append(got, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(app.resultHistograms) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("expected some histogram samples, got none")
|
||||||
|
}), "after 1 minute")
|
||||||
|
scrapeManager.Stop()
|
||||||
|
|
||||||
|
// Check for zero samples, assuming we only injected always one histogram sample.
|
||||||
|
// Did it contain CT to inject? If yes, was CT zero enabled?
|
||||||
|
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
|
||||||
|
require.Len(t, got, 2)
|
||||||
|
// Zero sample.
|
||||||
|
require.Equal(t, histogram.Histogram{}, *got[0].h)
|
||||||
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||||
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect only one, valid sample.
|
||||||
|
require.Len(t, got, 1)
|
||||||
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
||||||
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnregisterMetrics(t *testing.T) {
|
func TestUnregisterMetrics(t *testing.T) {
|
||||||
reg := prometheus.NewRegistry()
|
reg := prometheus.NewRegistry()
|
||||||
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
||||||
|
|
|
@ -1701,7 +1701,15 @@ loop:
|
||||||
} else {
|
} else {
|
||||||
if sl.enableCTZeroIngestion {
|
if sl.enableCTZeroIngestion {
|
||||||
if ctMs := p.CreatedTimestamp(); ctMs != nil {
|
if ctMs := p.CreatedTimestamp(); ctMs != nil {
|
||||||
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
|
if isHistogram && sl.enableNativeHistogramIngestion {
|
||||||
|
if h != nil {
|
||||||
|
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil)
|
||||||
|
} else {
|
||||||
|
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
|
||||||
|
}
|
||||||
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
||||||
// CT is an experimental feature. For now, we don't need to fail the
|
// CT is an experimental feature. For now, we don't need to fail the
|
||||||
// scrape on errors updating the created timestamp, log debug.
|
// scrape on errors updating the created timestamp, log debug.
|
||||||
|
|
|
@ -1999,7 +1999,8 @@ metric: <
|
||||||
`,
|
`,
|
||||||
contentType: "application/vnd.google.protobuf",
|
contentType: "application/vnd.google.protobuf",
|
||||||
histograms: []histogramSample{{
|
histograms: []histogramSample{{
|
||||||
t: 1234568,
|
t: 1234568,
|
||||||
|
metric: labels.FromStrings("__name__", "test_histogram"),
|
||||||
h: &histogram.Histogram{
|
h: &histogram.Histogram{
|
||||||
Count: 175,
|
Count: 175,
|
||||||
ZeroCount: 2,
|
ZeroCount: 2,
|
||||||
|
@ -2125,7 +2126,8 @@ metric: <
|
||||||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
|
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
|
||||||
},
|
},
|
||||||
histograms: []histogramSample{{
|
histograms: []histogramSample{{
|
||||||
t: 1234568,
|
t: 1234568,
|
||||||
|
metric: labels.FromStrings("__name__", "test_histogram"),
|
||||||
h: &histogram.Histogram{
|
h: &histogram.Histogram{
|
||||||
Count: 175,
|
Count: 175,
|
||||||
ZeroCount: 2,
|
ZeroCount: 2,
|
||||||
|
|
|
@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
|
||||||
return ref, nil
|
return ref, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, appender := range f.secondaries {
|
||||||
|
if _, err := appender.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
|
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
|
||||||
ref, err := f.primary.UpdateMetadata(ref, l, m)
|
ref, err := f.primary.UpdateMetadata(ref, l, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -50,7 +50,8 @@ var (
|
||||||
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
|
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
|
||||||
// behaviour, and we currently don't have a way to determine this. As a result
|
// behaviour, and we currently don't have a way to determine this. As a result
|
||||||
// it's recommended to ignore this error for now.
|
// it's recommended to ignore this error for now.
|
||||||
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
|
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
|
||||||
|
ErrCTNewerThanSample = fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
|
||||||
)
|
)
|
||||||
|
|
||||||
// SeriesRef is a generic series reference. In prometheus it is either a
|
// SeriesRef is a generic series reference. In prometheus it is either a
|
||||||
|
@ -313,6 +314,20 @@ type HistogramAppender interface {
|
||||||
// pointer. AppendHistogram won't mutate the histogram, but in turn
|
// pointer. AppendHistogram won't mutate the histogram, but in turn
|
||||||
// depends on the caller to not mutate it either.
|
// depends on the caller to not mutate it either.
|
||||||
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
|
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
|
||||||
|
// AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp,
|
||||||
|
// which will be associated with given series, labels and the incoming
|
||||||
|
// sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be
|
||||||
|
// appended, for example when ct is too old, or when it would collide with
|
||||||
|
// incoming sample (sample has priority).
|
||||||
|
//
|
||||||
|
// AppendHistogramCTZeroSample has to be called before the corresponding histogram AppendHistogram.
|
||||||
|
// A series reference number is returned which can be used to modify the
|
||||||
|
// CT for the given series in the same or later transactions.
|
||||||
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||||
|
// to AppendHistogramCTZeroSample() at any point.
|
||||||
|
//
|
||||||
|
// If the reference is 0 it must not be used for caching.
|
||||||
|
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetadataUpdater provides an interface for associating metadata to stored series.
|
// MetadataUpdater provides an interface for associating metadata to stored series.
|
||||||
|
|
|
@ -306,6 +306,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
// TODO: Implement
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
|
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
|
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
|
||||||
// UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now.
|
// UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now.
|
||||||
|
|
|
@ -915,6 +915,13 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
// AppendCTZeroSample is no-op for remote-write for now.
|
||||||
|
// TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might
|
||||||
|
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218).
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
|
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
if m.updateMetadataErr != nil {
|
if m.updateMetadataErr != nil {
|
||||||
return 0, m.updateMetadataErr
|
return 0, m.updateMetadataErr
|
||||||
|
|
|
@ -972,6 +972,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
|
||||||
return storage.SeriesRef(series.ref), nil
|
return storage.SeriesRef(series.ref), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
// TODO(bwplotka/arthursens): Wire metadata in the Agent's appender.
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
|
func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
// TODO: Wire metadata in the Agent's appender.
|
// TODO: Wire metadata in the Agent's appender.
|
||||||
return 0, nil
|
return 0, nil
|
||||||
|
|
|
@ -79,6 +79,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
|
||||||
return a.app.AppendHistogram(ref, l, t, h, fh)
|
return a.app.AppendHistogram(ref, l, t, h, fh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
if a.app != nil {
|
||||||
|
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
|
||||||
|
}
|
||||||
|
a.head.initTime(t)
|
||||||
|
a.app = a.head.appender()
|
||||||
|
|
||||||
|
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
if a.app != nil {
|
if a.app != nil {
|
||||||
return a.app.UpdateMetadata(ref, l, m)
|
return a.app.UpdateMetadata(ref, l, m)
|
||||||
|
@ -388,7 +398,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
||||||
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
|
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
|
||||||
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
|
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
|
||||||
if ct >= t {
|
if ct >= t {
|
||||||
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
|
return 0, storage.ErrCTNewerThanSample
|
||||||
}
|
}
|
||||||
|
|
||||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||||
|
@ -747,6 +757,91 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
||||||
return storage.SeriesRef(s.ref), nil
|
return storage.SeriesRef(s.ref), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
|
if !a.head.opts.EnableNativeHistograms.Load() {
|
||||||
|
return 0, storage.ErrNativeHistogramsDisabled
|
||||||
|
}
|
||||||
|
|
||||||
|
if ct >= t {
|
||||||
|
return 0, storage.ErrCTNewerThanSample
|
||||||
|
}
|
||||||
|
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||||
|
if s == nil {
|
||||||
|
// Ensure no empty labels have gotten through.
|
||||||
|
lset = lset.WithoutEmpty()
|
||||||
|
if lset.IsEmpty() {
|
||||||
|
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
|
||||||
|
}
|
||||||
|
|
||||||
|
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||||
|
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
|
||||||
|
}
|
||||||
|
|
||||||
|
var created bool
|
||||||
|
var err error
|
||||||
|
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if created {
|
||||||
|
switch {
|
||||||
|
case h != nil:
|
||||||
|
s.lastHistogramValue = &histogram.Histogram{}
|
||||||
|
case fh != nil:
|
||||||
|
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
|
||||||
|
}
|
||||||
|
a.series = append(a.series, record.RefSeries{
|
||||||
|
Ref: s.ref,
|
||||||
|
Labels: lset,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case h != nil:
|
||||||
|
zeroHistogram := &histogram.Histogram{}
|
||||||
|
s.Lock()
|
||||||
|
_, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, false) // OOO is not allowed for CTZeroSamples.
|
||||||
|
if err != nil {
|
||||||
|
s.Unlock()
|
||||||
|
if errors.Is(err, storage.ErrOutOfOrderSample) {
|
||||||
|
return 0, storage.ErrOutOfOrderCT
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.pendingCommit = true
|
||||||
|
s.Unlock()
|
||||||
|
a.histograms = append(a.histograms, record.RefHistogramSample{
|
||||||
|
Ref: s.ref,
|
||||||
|
T: ct,
|
||||||
|
H: zeroHistogram,
|
||||||
|
})
|
||||||
|
a.histogramSeries = append(a.histogramSeries, s)
|
||||||
|
case fh != nil:
|
||||||
|
zeroFloatHistogram := &histogram.FloatHistogram{}
|
||||||
|
s.Lock()
|
||||||
|
_, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, false) // OOO is not allowed for CTZeroSamples.
|
||||||
|
if err != nil {
|
||||||
|
s.Unlock()
|
||||||
|
if errors.Is(err, storage.ErrOutOfOrderSample) {
|
||||||
|
return 0, storage.ErrOutOfOrderCT
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.pendingCommit = true
|
||||||
|
s.Unlock()
|
||||||
|
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
|
||||||
|
Ref: s.ref,
|
||||||
|
T: ct,
|
||||||
|
FH: zeroFloatHistogram,
|
||||||
|
})
|
||||||
|
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ct > a.maxt {
|
||||||
|
a.maxt = ct
|
||||||
|
}
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
|
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
|
||||||
// use getOrCreate or make any of the lset sanity checks that Append does.
|
// use getOrCreate or make any of the lset sanity checks that Append does.
|
||||||
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {
|
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {
|
||||||
|
|
|
@ -6363,6 +6363,154 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadAppender_AppendHistogramCTZeroSample(t *testing.T) {
|
||||||
|
testHistogram := tsdbutil.GenerateTestHistogram(1)
|
||||||
|
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
|
||||||
|
lbls := labels.FromStrings("foo", "bar")
|
||||||
|
type appendableHistograms struct {
|
||||||
|
ts int64
|
||||||
|
h *histogram.Histogram
|
||||||
|
fh *histogram.FloatHistogram
|
||||||
|
ct int64
|
||||||
|
}
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
appendableHistograms []appendableHistograms
|
||||||
|
expectedHistograms []chunks.Sample
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "In order ct+normal sample/histogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, h: testHistogram, ct: 1},
|
||||||
|
},
|
||||||
|
expectedHistograms: []chunks.Sample{
|
||||||
|
sample{t: 1, h: &histogram.Histogram{}},
|
||||||
|
sample{t: 100, h: testHistogram},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "In order ct+normal sample/floathistogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, fh: testFloatHistogram, ct: 1},
|
||||||
|
},
|
||||||
|
expectedHistograms: []chunks.Sample{
|
||||||
|
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||||
|
sample{t: 100, fh: testFloatHistogram},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Consecutive appends with same ct ignore ct/histogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, h: testHistogram, ct: 1},
|
||||||
|
{ts: 101, h: testHistogram, ct: 1},
|
||||||
|
},
|
||||||
|
expectedHistograms: func() []chunks.Sample {
|
||||||
|
hNoCounterReset := *testHistogram
|
||||||
|
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||||
|
return []chunks.Sample{
|
||||||
|
sample{t: 1, h: &histogram.Histogram{}},
|
||||||
|
sample{t: 100, h: testHistogram},
|
||||||
|
sample{t: 101, h: &hNoCounterReset},
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Consecutive appends with same ct ignore ct/floathistogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, fh: testFloatHistogram, ct: 1},
|
||||||
|
{ts: 101, fh: testFloatHistogram, ct: 1},
|
||||||
|
},
|
||||||
|
expectedHistograms: func() []chunks.Sample {
|
||||||
|
fhNoCounterReset := *testFloatHistogram
|
||||||
|
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||||
|
return []chunks.Sample{
|
||||||
|
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||||
|
sample{t: 100, fh: testFloatHistogram},
|
||||||
|
sample{t: 101, fh: &fhNoCounterReset},
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Consecutive appends with newer ct do not ignore ct/histogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, h: testHistogram, ct: 1},
|
||||||
|
{ts: 102, h: testHistogram, ct: 101},
|
||||||
|
},
|
||||||
|
expectedHistograms: []chunks.Sample{
|
||||||
|
sample{t: 1, h: &histogram.Histogram{}},
|
||||||
|
sample{t: 100, h: testHistogram},
|
||||||
|
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.CounterReset}},
|
||||||
|
sample{t: 102, h: testHistogram},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Consecutive appends with newer ct do not ignore ct/floathistogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, fh: testFloatHistogram, ct: 1},
|
||||||
|
{ts: 102, fh: testFloatHistogram, ct: 101},
|
||||||
|
},
|
||||||
|
expectedHistograms: []chunks.Sample{
|
||||||
|
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||||
|
sample{t: 100, fh: testFloatHistogram},
|
||||||
|
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.CounterReset}},
|
||||||
|
sample{t: 102, fh: testFloatHistogram},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "CT equals to previous sample timestamp is ignored/histogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, h: testHistogram, ct: 1},
|
||||||
|
{ts: 101, h: testHistogram, ct: 100},
|
||||||
|
},
|
||||||
|
expectedHistograms: func() []chunks.Sample {
|
||||||
|
hNoCounterReset := *testHistogram
|
||||||
|
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||||
|
return []chunks.Sample{
|
||||||
|
sample{t: 1, h: &histogram.Histogram{}},
|
||||||
|
sample{t: 100, h: testHistogram},
|
||||||
|
sample{t: 101, h: &hNoCounterReset},
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "CT equals to previous sample timestamp is ignored/floathistogram",
|
||||||
|
appendableHistograms: []appendableHistograms{
|
||||||
|
{ts: 100, fh: testFloatHistogram, ct: 1},
|
||||||
|
{ts: 101, fh: testFloatHistogram, ct: 100},
|
||||||
|
},
|
||||||
|
expectedHistograms: func() []chunks.Sample {
|
||||||
|
fhNoCounterReset := *testFloatHistogram
|
||||||
|
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||||
|
return []chunks.Sample{
|
||||||
|
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||||
|
sample{t: 100, fh: testFloatHistogram},
|
||||||
|
sample{t: 101, fh: &fhNoCounterReset},
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
}()
|
||||||
|
appender := head.Appender(context.Background())
|
||||||
|
for _, sample := range tc.appendableHistograms {
|
||||||
|
ref, err := appender.AppendHistogramCTZeroSample(0, lbls, sample.ts, sample.ct, sample.h, sample.fh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = appender.AppendHistogram(ref, lbls, sample.ts, sample.h, sample.fh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, appender.Commit())
|
||||||
|
|
||||||
|
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
result := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||||
|
require.Equal(t, tc.expectedHistograms, result[`{foo="bar"}`])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
|
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
|
||||||
// Use a chunk range of 1 here so that if we attempted to determine if the head
|
// Use a chunk range of 1 here so that if we attempted to determine if the head
|
||||||
// was compactable using default values for min and max times, `Head.compactable()`
|
// was compactable using default values for min and max times, `Head.compactable()`
|
||||||
|
|
Loading…
Reference in a new issue