mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Extend AppendExemplar with AppendHints
Signed-off-by: Jesus Vazquez <jesusvzpg@gmail.com>
This commit is contained in:
parent
ee8a429af3
commit
a68557dd9c
|
@ -1643,7 +1643,7 @@ func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64
|
|||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64, *s
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar, *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
|
|||
return ref, err
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
a.pendingExemplars = append(a.pendingExemplars, e)
|
||||
|
@ -145,7 +145,7 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
return a.next.AppendExemplar(ref, l, e)
|
||||
return a.next.AppendExemplar(ref, l, e, nil)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
|
|
|
@ -1809,7 +1809,7 @@ loop:
|
|||
slices.SortFunc(exemplars, exemplar.Compare)
|
||||
outOfOrderExemplars := 0
|
||||
for _, e := range exemplars {
|
||||
_, exemplarErr := app.AppendExemplar(ref, lset, e)
|
||||
_, exemplarErr := app.AppendExemplar(ref, lset, e, nil)
|
||||
switch {
|
||||
case exemplarErr == nil:
|
||||
// Do nothing.
|
||||
|
|
|
@ -161,14 +161,14 @@ func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
|
||||
ref, err := f.primary.AppendExemplar(ref, l, e)
|
||||
func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *AppendHints) (SeriesRef, error) {
|
||||
ref, err := f.primary.AppendExemplar(ref, l, e, nil)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.AppendExemplar(ref, l, e); err != nil {
|
||||
if _, err := appender.AppendExemplar(ref, l, e, nil); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -307,7 +307,7 @@ type ExemplarAppender interface {
|
|||
// Note that in our current implementation of Prometheus' exemplar storage
|
||||
// calls to Append should generate the reference numbers, AppendExemplar
|
||||
// generating a new reference number should be considered possible erroneous behaviour and be logged.
|
||||
AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error)
|
||||
AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *AppendHints) (SeriesRef, error)
|
||||
}
|
||||
|
||||
// HistogramAppender provides an interface for appending histograms to the storage.
|
||||
|
|
|
@ -294,7 +294,7 @@ func (t *timestampTracker) Append(ref storage.SeriesRef, l labels.Labels, ts int
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (t *timestampTracker) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
t.exemplars++
|
||||
return 0, nil
|
||||
}
|
||||
|
|
|
@ -256,7 +256,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
|||
|
||||
for _, ep := range ts.Exemplars {
|
||||
e := ep.ToExemplar(&b, nil)
|
||||
if _, err := app.AppendExemplar(0, ls, e); err != nil {
|
||||
if _, err := app.AppendExemplar(0, ls, e, nil); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrOutOfOrderExemplar):
|
||||
outOfOrderExemplarErrs++
|
||||
|
@ -441,7 +441,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
|||
// Exemplars.
|
||||
for _, ep := range ts.Exemplars {
|
||||
e := ep.ToExemplar(&b, req.Symbols)
|
||||
ref, err = app.AppendExemplar(ref, ls, e)
|
||||
ref, err = app.AppendExemplar(ref, ls, e, nil)
|
||||
if err == nil {
|
||||
rs.Exemplars++
|
||||
continue
|
||||
|
@ -572,12 +572,12 @@ func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.La
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
if e.Ts > app.maxTime {
|
||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||
}
|
||||
|
||||
ref, err := app.Appender.AppendExemplar(ref, l, e)
|
||||
ref, err := app.Appender.AppendExemplar(ref, l, e, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -873,7 +873,7 @@ func (m *mockAppendable) Rollback() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (m *mockAppendable) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
if m.appendExemplarErr != nil {
|
||||
return 0, m.appendExemplarErr
|
||||
}
|
||||
|
|
|
@ -846,7 +846,7 @@ func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool
|
|||
return series, true
|
||||
}
|
||||
|
||||
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
// Series references and chunk references are identical for agent mode.
|
||||
headRef := chunks.HeadSeriesRef(ref)
|
||||
|
||||
|
|
|
@ -66,20 +66,20 @@ func TestDB_InvalidSeries(t *testing.T) {
|
|||
sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, nil)
|
||||
require.NoError(t, err, "should not reject valid series")
|
||||
|
||||
_, err = app.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{})
|
||||
_, err = app.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{}, nil)
|
||||
require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0")
|
||||
|
||||
e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1", "a", "2")}
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels")
|
||||
|
||||
e = exemplar.Exemplar{Labels: labels.FromStrings("a_somewhat_long_trace_id", "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa")}
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length")
|
||||
|
||||
// Inverse check
|
||||
e = exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
require.NoError(t, err, "should not reject valid exemplars")
|
||||
})
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ func TestCommit(t *testing.T) {
|
|||
Value: sample[0].F(),
|
||||
HasTs: true,
|
||||
}
|
||||
_, err = app.AppendExemplar(ref, lset, e)
|
||||
_, err = app.AppendExemplar(ref, lset, e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -714,21 +714,21 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
|
|||
// If the Labels, Value or Timestamp are different than the last exemplar,
|
||||
// then a new one should be appended; Otherwise, it should be skipped.
|
||||
e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true}
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
|
||||
e.Labels = labels.FromStrings("b", "2")
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
|
||||
e.Value = 42
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
|
||||
e.Ts = 25
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
_, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e, nil)
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -783,7 +783,7 @@ func TestDBAllowOOOSamples(t *testing.T) {
|
|||
Value: float64(i),
|
||||
HasTs: true,
|
||||
}
|
||||
_, err = app.AppendExemplar(ref, lset, e)
|
||||
_, err = app.AppendExemplar(ref, lset, e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -847,7 +847,7 @@ func TestDBAllowOOOSamples(t *testing.T) {
|
|||
Value: float64(i),
|
||||
HasTs: true,
|
||||
}
|
||||
_, err = app.AppendExemplar(ref, lset, e)
|
||||
_, err = app.AppendExemplar(ref, lset, e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,21 +50,21 @@ func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
return a.app.Append(ref, lset, t, v, nil)
|
||||
}
|
||||
|
||||
func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
// Check if exemplar storage is enabled.
|
||||
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if a.app != nil {
|
||||
return a.app.AppendExemplar(ref, l, e)
|
||||
return a.app.AppendExemplar(ref, l, e, nil)
|
||||
}
|
||||
// We should never reach here given we would call Append before AppendExemplar
|
||||
// and we probably want to always base head/WAL min time on sample times.
|
||||
a.head.initTime(e.Ts)
|
||||
a.app = a.head.appender()
|
||||
|
||||
return a.app.AppendExemplar(ref, l, e)
|
||||
return a.app.AppendExemplar(ref, l, e, nil)
|
||||
}
|
||||
|
||||
func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
|
@ -595,7 +595,7 @@ func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogr
|
|||
|
||||
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
|
||||
// use getOrCreate or make any of the lset validity checks that Append does.
|
||||
func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (a *headAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
// Check if exemplar storage is enabled.
|
||||
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 {
|
||||
return 0, nil
|
||||
|
@ -604,7 +604,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
|
|||
// Get Series
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
s = a.head.series.getByHash(lset.Hash(), lset)
|
||||
s = a.head.series.getByHash(l.Hash(), l)
|
||||
if s != nil {
|
||||
ref = storage.SeriesRef(s.ref)
|
||||
}
|
||||
|
|
|
@ -3217,7 +3217,7 @@ func TestHeadExemplars(t *testing.T) {
|
|||
HasTs: true,
|
||||
Ts: -1000,
|
||||
Value: 1,
|
||||
})
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, head.Close())
|
||||
|
@ -3984,7 +3984,7 @@ func TestChunkSnapshot(t *testing.T) {
|
|||
},
|
||||
}
|
||||
expExemplars = append(expExemplars, e)
|
||||
_, err := app.AppendExemplar(ref, e.seriesLabels, e.e)
|
||||
_, err := app.AppendExemplar(ref, e.seriesLabels, e.e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -6110,7 +6110,7 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
|
|||
app := h.Appender(context.Background())
|
||||
ref, err := tc.appendF(app, 10)
|
||||
require.NoError(t, err)
|
||||
app.AppendExemplar(ref, lbls, exemplar.Exemplar{Value: 1.0, Ts: 5})
|
||||
app.AppendExemplar(ref, lbls, exemplar.Exemplar{Value: 1.0, Ts: 5}, nil)
|
||||
|
||||
app.Commit()
|
||||
|
||||
|
@ -6147,7 +6147,7 @@ func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) {
|
|||
app.Commit()
|
||||
// Not adding a sample here to trigger the fault.
|
||||
app = h.Appender(context.Background())
|
||||
_, err = app.AppendExemplar(ref, lbls, exemplar.Exemplar{Value: 1, Ts: 20})
|
||||
_, err = app.AppendExemplar(ref, lbls, exemplar.Exemplar{Value: 1, Ts: 20}, nil)
|
||||
require.NoError(t, err)
|
||||
h.Truncate(10)
|
||||
app.Commit()
|
||||
|
|
|
@ -86,6 +86,6 @@ func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
|
|||
return s.exemplarStorage
|
||||
}
|
||||
|
||||
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, hints *storage.AppendHints) (storage.SeriesRef, error) {
|
||||
return ref, s.exemplarStorage.AddExemplar(l, e)
|
||||
}
|
||||
|
|
|
@ -434,7 +434,7 @@ func TestEndpoints(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, ed := range exemplars {
|
||||
_, err := storage.AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0])
|
||||
_, err := storage.AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0], nil)
|
||||
require.NoError(t, err, "failed to add exemplar: %+v", ed.Exemplars[0])
|
||||
}
|
||||
|
||||
|
@ -744,7 +744,7 @@ func TestQueryExemplars(t *testing.T) {
|
|||
|
||||
for _, te := range tc.exemplars {
|
||||
for _, e := range te.Exemplars {
|
||||
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
|
||||
_, err := es.AppendExemplar(0, te.SeriesLabels, e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -3454,7 +3454,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
|
|||
|
||||
for _, te := range test.exemplars {
|
||||
for _, e := range te.Exemplars {
|
||||
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
|
||||
_, err := es.AppendExemplar(0, te.SeriesLabels, e, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue