tsdb/agent: allow ingestion of OOO samples (#12897)

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
Signed-off-by: Levi Harrison <git@leviharrison.dev>
This commit is contained in:
Paschalis Tsilias 2023-10-13 17:33:09 +03:00 committed by Levi Harrison
parent 4df2f2432b
commit 42b8f2f5fc
2 changed files with 127 additions and 10 deletions

View file

@ -802,11 +802,6 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series.Lock() series.Lock()
defer series.Unlock() defer series.Unlock()
if t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
// NOTE: always modify pendingSamples and sampleSeries together. // NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{ a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref, Ref: series.ref,
@ -930,11 +925,6 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series.Lock() series.Lock()
defer series.Unlock() defer series.Unlock()
if t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
switch { switch {
case h != nil: case h != nil:
// NOTE: always modify pendingHistograms and histogramSeries together // NOTE: always modify pendingHistograms and histogramSeries together

View file

@ -751,3 +751,130 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through. // We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, walExemplarsCount) require.Equal(t, 4, walExemplarsCount)
} }
func TestDBAllowOOOSamples(t *testing.T) {
const (
numDatapoints = 5
numHistograms = 5
numSeries = 4
offset = 100
)
reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, DefaultOptions())
app := s.Appender(context.TODO())
// Let's add some samples in the [offset, offset+numDatapoints) range.
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)
for i := offset; i < numDatapoints+offset; i++ {
ref, err := app.Append(0, lset, int64(i), float64(i))
require.NoError(t, err)
e := exemplar.Exemplar{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := offset; i < numDatapoints+offset; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset])
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(40), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())
// Replay the storage so that the lastTs for each series is recorded.
reg2 := prometheus.NewRegistry()
db, err := Open(s.logger, reg2, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}
app = db.Appender(context.Background())
// Now the lastTs will have been recorded successfully.
// Let's try appending twice as many OOO samples in the [0, numDatapoints) range.
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ {
ref, err := app.Append(0, lset, int64(i), float64(i))
require.NoError(t, err)
e := exemplar.Exemplar{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, db.Close())
}