WIP: Histogram CT Zero ingestion
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
Arthur Silva Sens 2024-07-19 11:28:00 -03:00
parent 9849418fac
commit 5a78f8295d
No known key found for this signature in database
10 changed files with 254 additions and 9 deletions

View file

@ -1584,6 +1584,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View file

@ -55,6 +55,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
return 0, nil
}
func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}
@ -154,6 +158,17 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
return a.next.AppendHistogram(ref, l, t, h, fh)
}
func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
zeroHistogram := &histogram.Histogram{Count: 0, Sum: 0, Schema: 3, ZeroThreshold: 0.0, ZeroCount: 0}
return a.AppendHistogram(ref, l, ct, zeroHistogram, nil)
}
zeroFloatHistogram := &histogram.FloatHistogram{Count: 0, Sum: 0, Schema: 3, ZeroThreshold: 0.0, ZeroCount: 0}
return a.AppendHistogram(ref, l, ct, nil, zeroFloatHistogram)
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()

View file

@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/util/runutil"
@ -857,6 +858,177 @@ func getResultFloats(app *collectResultAppender, expectedMetricName string) (res
return result
}
func TestManagerCTZeroIngestionHistogram(t *testing.T) {
const mName = "expected_histogram"
for _, tc := range []struct {
name string
h *dto.Histogram
enableCTZeroIngestion bool
expectedValues []histogram.Histogram
}{
{
name: "disabled with CT on histogram",
h: &dto.Histogram{
SampleCount: proto.Uint64(4),
SampleSum: proto.Float64(6),
Schema: proto.Int32(3),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(1),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(0), Length: proto.Uint32(1)},
{Offset: proto.Int32(7), Length: proto.Uint32(1)},
{Offset: proto.Int32(4), Length: proto.Uint32(1)},
},
PositiveDelta: []int64{1, 0, 0},
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: false,
expectedValues: []histogram.Histogram{
// Exposed sample
{
Count: 4,
Sum: 6,
Schema: 3,
ZeroThreshold: 2.938735877055719e-39,
ZeroCount: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 7, Length: 1},
{Offset: 4, Length: 1},
},
},
},
},
{
name: "enabled with CT on histogram",
h: &dto.Histogram{
SampleCount: proto.Uint64(4),
SampleSum: proto.Float64(6),
Schema: proto.Int32(3),
ZeroThreshold: proto.Float64(2.938735877055719e-39),
ZeroCount: proto.Uint64(1),
PositiveSpan: []*dto.BucketSpan{
{Offset: proto.Int32(0), Length: proto.Uint32(1)},
{Offset: proto.Int32(7), Length: proto.Uint32(1)},
{Offset: proto.Int32(4), Length: proto.Uint32(1)},
},
PositiveDelta: []int64{1, 0, 0},
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
expectedValues: []histogram.Histogram{
// CT
{Count: 0, Sum: 0, Schema: 0, ZeroThreshold: 0.0, ZeroCount: 0},
// Exposed sample
{
Count: 4,
Sum: 6,
Schema: 3,
ZeroThreshold: 2.938735877055719e-39,
ZeroCount: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 7, Length: 1},
{Offset: 4, Length: 1},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
ctrType := dto.MetricType_HISTOGRAM
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Histogram: tc.h}},
}))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
for _, expected := range tc.expectedValues {
if countHistogramSamples(app, expected.String()) != int(expected.Count) {
return fmt.Errorf("expected %v histograms for %s", expected, expected.String())
}
}
return nil
}), "after 1 minute")
scrapeManager.Stop()
require.Equal(t, tc.expectedValues, getResultHistograms(app, mName))
})
}
}
func countHistogramSamples(a *collectResultAppender, expectedMetricName string) (count int) {
fmt.Println("Implement me")
return count
}
func getResultHistograms(app *collectResultAppender, expectedMetricName string) (result []histogramSample) {
fmt.Println("Implement me")
return result
}
func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
// Check that all metrics can be unregistered, allowing a second manager to be created.

View file

@ -1634,24 +1634,32 @@ loop:
if seriesAlreadyScraped {
err = storage.ErrDuplicateSampleForTimestamp
} else {
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if ctMs := p.CreatedTimestamp(); ctMs != nil {
if isHistogram {
if h != nil {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil)
} else {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh)
}
} else {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}
}
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.Append(ref, lset, t, val)
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
if err == nil {

View file

@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
return ref, nil
}
func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
ref, err := f.primary.UpdateMetadata(ref, l, m)
if err != nil {

View file

@ -312,6 +312,8 @@ type HistogramAppender interface {
// pointer. AppendHistogram won't mutate the histogram, but in turn
// depends on the caller to not mutate it either.
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
// AppendHistogramCTZeroSample does the same thing as AppendCTZeroSample but for histograms.
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
}
// MetadataUpdater provides an interface for associating metadata to stored series.

View file

@ -306,6 +306,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}
func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
//TODO: Implement
return 0, nil
}
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
// UpadteMetadata is no-op for remote write (where timestampTracker is being used) for now.

View file

@ -892,6 +892,11 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, nil
}
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
//TODO: Implement this.
return 0, nil
}
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
if m.updateMetadataErr != nil {
return 0, m.updateMetadataErr

View file

@ -972,6 +972,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
return storage.SeriesRef(series.ref), nil
}
func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
//TODO: Implement this.
return 0, nil
}
func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in the Agent's appender.
return 0, nil

View file

@ -78,6 +78,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
return a.app.AppendHistogram(ref, l, t, h, fh)
}
func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.UpdateMetadata(ref, l, m)
@ -672,6 +682,11 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
//TODO: Implement this.
return 0, nil
}
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {