feat: Implement Created Timestamp ingestion for agent mode

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>
This commit is contained in:
Arthur Silva Sens 2024-04-01 16:21:07 -03:00
parent b51bbdd7ad
commit 8f722b40b6
No known key found for this signature in database
4 changed files with 136 additions and 5 deletions

View file

@ -49,7 +49,8 @@ var (
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now.
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
ErrCTNewerThanSample = fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
)
// SeriesRef is a generic series reference. In prometheus it is either a

View file

@ -963,9 +963,53 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met
return 0, nil
}
func (a *appender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
// TODO(bwplotka): Wire metadata in the Agent's appender.
return 0, nil
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
series := a.series.GetByID(chunks.HeadSeriesRef(ref))
if series == nil {
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
var created bool
series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
}
}
series.Lock()
defer series.Unlock()
if ct <= series.lastTs {
return storage.SeriesRef(series.ref), storage.ErrOutOfOrderCT
}
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref,
T: ct,
V: 0,
})
a.sampleSeries = append(a.sampleSeries, series)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return storage.SeriesRef(series.ref), nil
}
// Commit submits the collected samples and purges the batch.

View file

@ -879,6 +879,92 @@ func TestDBAllowOOOSamples(t *testing.T) {
require.NoError(t, db.Close())
}
func TestAppendCTZeroSample(t *testing.T) {
type appendableSamples struct {
ts int64
val float64
ct int64
}
for _, tc := range []struct {
name string
appendableSamples []appendableSamples
appendedSamples float64
expectedSamples []model.Sample
}{
{
name: "In order ct+normal sample",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
},
appendedSamples: 2,
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
},
},
{
name: "Consecutive appends with same ct ignore ct",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 1},
},
appendedSamples: 3,
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
},
},
{
name: "Consecutive appends with newer ct do not ignore ct",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 102, val: 10, ct: 101},
},
appendedSamples: 4,
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 0},
{Timestamp: 102, Value: 10},
},
},
{
name: "CT equals to previous sample timestamp is ignored",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 100},
},
appendedSamples: 3,
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewRegistry()
db := createTestAgentDB(t, reg, DefaultOptions())
app := db.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := app.AppendCTZeroSample(0, lbls, sample.ts, sample.ct)
if err != nil {
require.ErrorIs(t, err, storage.ErrOutOfOrderCT)
}
_, err = app.Append(0, lbls, sample.ts, sample.val)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, tc.appendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.NoError(t, db.Close())
})
}
}
func BenchmarkCreateSeries(b *testing.B) {
s := createTestAgentDB(b, nil, DefaultOptions())
defer s.Close()

View file

@ -394,7 +394,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))