tsdb/agent: Synchronize appender code with grafana/agent main (#9664)

* tsdb/agent: synchronize appender code with grafana/agent main

This commit synchronize the appender code with grafana/agent main. This
includes adding support for appending exemplars.

Closes #9610

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: fix build error

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: introduce some exemplar tests, refactor tests a little

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: address review feedback

- Re-use hash when creating a new series
- Fix typo in exemplar append error

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: remove unused AddFast method

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: close wal reader after test

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* tsdb/agent: add out-of-order tracking, change series TS in commit

Signed-off-by: Robert Fratto <robertfratto@gmail.com>

* address review feedback

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
Robert Fratto 2021-11-30 10:44:40 -05:00 committed by GitHub
parent b63d7c786e
commit 4cbddb41eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 322 additions and 266 deletions

View file

@ -16,9 +16,11 @@ package agent
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
"unicode/utf8"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -93,6 +95,8 @@ type dbMetrics struct {
numActiveSeries prometheus.Gauge numActiveSeries prometheus.Gauge
numWALSeriesPendingDeletion prometheus.Gauge numWALSeriesPendingDeletion prometheus.Gauge
totalAppendedSamples prometheus.Counter totalAppendedSamples prometheus.Counter
totalAppendedExemplars prometheus.Counter
totalOutOfOrderSamples prometheus.Counter
walTruncateDuration prometheus.Summary walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter walCorruptionsTotal prometheus.Counter
walTotalReplayDuration prometheus.Gauge walTotalReplayDuration prometheus.Gauge
@ -119,6 +123,16 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
Help: "Total number of samples appended to the storage", Help: "Total number of samples appended to the storage",
}) })
m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_exemplars_appended_total",
Help: "Total number of exemplars appended to the storage",
})
m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_out_of_order_samples_total",
Help: "Total number of out of order samples ingestion failed attempts.",
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_agent_truncate_duration_seconds", Name: "prometheus_agent_truncate_duration_seconds",
Help: "Duration of WAL truncation.", Help: "Duration of WAL truncation.",
@ -159,6 +173,8 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
m.numActiveSeries, m.numActiveSeries,
m.numWALSeriesPendingDeletion, m.numWALSeriesPendingDeletion,
m.totalAppendedSamples, m.totalAppendedSamples,
m.totalAppendedExemplars,
m.totalOutOfOrderSamples,
m.walTruncateDuration, m.walTruncateDuration,
m.walCorruptionsTotal, m.walCorruptionsTotal,
m.walTotalReplayDuration, m.walTotalReplayDuration,
@ -180,6 +196,15 @@ func (m *dbMetrics) Unregister() {
m.numActiveSeries, m.numActiveSeries,
m.numWALSeriesPendingDeletion, m.numWALSeriesPendingDeletion,
m.totalAppendedSamples, m.totalAppendedSamples,
m.totalAppendedExemplars,
m.totalOutOfOrderSamples,
m.walTruncateDuration,
m.walCorruptionsTotal,
m.walTotalReplayDuration,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
} }
for _, c := range cs { for _, c := range cs {
m.r.Unregister(c) m.r.Unregister(c)
@ -260,6 +285,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
DB: db, DB: db,
pendingSeries: make([]record.RefSeries, 0, 100), pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100), pendingSamples: make([]record.RefSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
} }
} }
@ -411,11 +437,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
return return
} }
decoded <- samples decoded <- samples
case record.Tombstones: case record.Tombstones, record.Exemplars:
// We don't care about tombstones // We don't care about tombstones or exemplars during replay.
continue
case record.Exemplars:
// We don't care about exemplars
continue continue
default: default:
errCh <- &wal.CorruptionErr{ errCh <- &wal.CorruptionErr{
@ -667,23 +690,19 @@ type appender struct {
pendingSeries []record.RefSeries pendingSeries []record.RefSeries
pendingSamples []record.RefSample pendingSamples []record.RefSample
pendingExamplars []record.RefExemplar
// Pointers to the series referenced by each element of pendingSamples.
// Series lock is not held on elements.
sampleSeries []*memSeries
} }
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ref == 0 { // series references and chunk references are identical for agent mode.
r, err := a.Add(l, t, v) headRef := chunks.HeadSeriesRef(ref)
return storage.SeriesRef(r), err
}
return ref, a.AddFast(chunks.HeadSeriesRef(ref), t, v)
}
func (a *appender) Add(l labels.Labels, t int64, v float64) (chunks.HeadSeriesRef, error) {
hash := l.Hash()
series := a.series.GetByHash(hash, l)
if series != nil {
return series.ref, a.AddFast(series.ref, t, v)
}
series := a.series.GetByID(headRef)
if series == nil {
// Ensure no empty or duplicate labels have gotten through. This mirrors the // Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender. // equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty() l = l.WithoutEmpty()
@ -695,52 +714,88 @@ func (a *appender) Add(l labels.Labels, t int64, v float64) (chunks.HeadSeriesRe
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
} }
ref := chunks.HeadSeriesRef(a.nextRef.Inc()) var created bool
series = &memSeries{ref: ref, lset: l, lastTs: t} series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{ a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: ref, Ref: series.ref,
Labels: l, Labels: l,
}) })
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: ref,
T: t,
V: v,
})
a.series.Set(hash, series)
a.metrics.numActiveSeries.Inc() a.metrics.numActiveSeries.Inc()
a.metrics.totalAppendedSamples.Inc() }
return series.ref, nil
} }
func (a *appender) AddFast(ref chunks.HeadSeriesRef, t int64, v float64) error {
series := a.series.GetByID(ref)
if series == nil {
return storage.ErrNotFound
}
series.Lock() series.Lock()
defer series.Unlock() defer series.Unlock()
// Update last recorded timestamp. Used by Storage.gc to determine if a if t < series.lastTs {
// series is dead. a.metrics.totalOutOfOrderSamples.Inc()
series.lastTs = t return 0, storage.ErrOutOfOrderSample
}
// NOTE: always modify pendingSamples and sampleSeries together
a.pendingSamples = append(a.pendingSamples, record.RefSample{ a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: ref, Ref: series.ref,
T: t, T: t,
V: v, V: v,
}) })
a.sampleSeries = append(a.sampleSeries, series)
a.metrics.totalAppendedSamples.Inc() a.metrics.totalAppendedSamples.Inc()
return nil return storage.SeriesRef(series.ref), nil
}
func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
hash := l.Hash()
series = a.series.GetByHash(hash, l)
if series != nil {
return series, false
}
ref := chunks.HeadSeriesRef(a.nextRef.Inc())
series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64}
a.series.Set(hash, series)
return series, true
} }
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// remote_write doesn't support exemplars yet, so do nothing here. // series references and chunk references are identical for agent mode.
return 0, nil headRef := chunks.HeadSeriesRef(ref)
s := a.series.GetByID(headRef)
if s == nil {
return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref)
}
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl))
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
for _, l := range e.Labels {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return 0, storage.ErrExemplarLabelLength
}
}
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
return storage.SeriesRef(s.ref), nil
} }
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
@ -767,6 +822,22 @@ func (a *appender) Commit() error {
buf = buf[:0] buf = buf[:0]
} }
if len(a.pendingExamplars) > 0 {
buf = encoder.Exemplars(a.pendingExamplars, buf)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
var series *memSeries
for i, s := range a.pendingSamples {
series = a.sampleSeries[i]
if !series.updateTimestamp(s.T) {
a.metrics.totalOutOfOrderSamples.Inc()
}
}
//nolint:staticcheck //nolint:staticcheck
a.bufPool.Put(buf) a.bufPool.Put(buf)
return a.Rollback() return a.Rollback()
@ -775,6 +846,8 @@ func (a *appender) Commit() error {
func (a *appender) Rollback() error { func (a *appender) Rollback() error {
a.pendingSeries = a.pendingSeries[:0] a.pendingSeries = a.pendingSeries[:0]
a.pendingSamples = a.pendingSamples[:0] a.pendingSamples = a.pendingSamples[:0]
a.pendingExamplars = a.pendingExamplars[:0]
a.sampleSeries = a.sampleSeries[:0]
a.appenderPool.Put(a) a.appenderPool.Put(a)
return nil return nil
} }

View file

@ -15,8 +15,8 @@ package agent
import ( import (
"context" "context"
"path/filepath"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -26,25 +26,70 @@ import (
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wal"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
func TestUnsupported(t *testing.T) { func TestDB_InvalidSeries(t *testing.T) {
promAgentDir := t.TempDir() s := createTestAgentDB(t, nil, DefaultOptions())
defer s.Close()
opts := DefaultOptions() app := s.Appender(context.Background())
logger := log.NewNopLogger()
s, err := Open(logger, prometheus.NewRegistry(), nil, promAgentDir, opts) t.Run("Samples", func(t *testing.T) {
_, err := app.Append(0, labels.Labels{}, 0, 0)
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0)
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Exemplars", func(t *testing.T) {
sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
require.NoError(t, err, "should not reject valid series")
_, err = app.AppendExemplar(0, nil, exemplar.Exemplar{})
require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0")
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}}
_, err = app.AppendExemplar(sRef, nil, e)
require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels")
e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a_somewhat_long_trace_id", Value: "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa"}}}
_, err = app.AppendExemplar(sRef, nil, e)
require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length")
// Inverse check
e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
_, err = app.AppendExemplar(sRef, nil, e)
require.NoError(t, err, "should not reject valid exemplars")
})
}
func createTestAgentDB(t *testing.T, reg prometheus.Registerer, opts *Options) *DB {
t.Helper()
dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
db, err := Open(log.NewNopLogger(), reg, rs, dbDir, opts)
require.NoError(t, err) require.NoError(t, err)
defer func() { return db
require.NoError(t, s.Close()) }
}()
func TestUnsupportedFunctions(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
defer s.Close()
t.Run("Querier", func(t *testing.T) { t.Run("Querier", func(t *testing.T) {
_, err := s.Querier(context.TODO(), 0, 0) _, err := s.Querier(context.TODO(), 0, 0)
@ -68,93 +113,74 @@ func TestCommit(t *testing.T) {
numSeries = 8 numSeries = 8
) )
promAgentDir := t.TempDir() s := createTestAgentDB(t, nil, DefaultOptions())
app := s.Appender(context.TODO())
lbls := labelsForTest(t.Name(), numSeries) lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func(rs *remote.Storage) {
require.NoError(t, rs.Close())
}(remoteStorage)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
a := s.Appender(context.TODO())
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
sample := tsdbutil.GenerateSamples(0, 1) sample := tsdbutil.GenerateSamples(0, 1)
_, err := a.Append(0, lset, sample[0].T(), sample[0].V()) ref, err := app.Append(0, lset, sample[0].T(), sample[0].V())
require.NoError(t, err)
e := exemplar.Exemplar{
Labels: lset,
Ts: sample[0].T(),
Value: sample[0].V(),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(t, err) require.NoError(t, err)
} }
} }
require.NoError(t, a.Commit()) require.NoError(t, app.Commit())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
// Read records from WAL and check for expected count of series and samples. sr, err := wal.NewSegmentsReader(s.wal.Dir())
walSeriesCount := 0
walSamplesCount := 0
reg = prometheus.NewRegistry()
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func() {
require.NoError(t, remoteStorage.Close())
}()
s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, s1.Close())
}()
var dec record.Decoder
if err == nil {
sr, err := wal.NewSegmentsReader(s1.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, sr.Close()) require.NoError(t, sr.Close())
}() }()
r := wal.NewReader(sr) // Read records from WAL and check for expected count of series, samples, and exemplars.
seriesPool := sync.Pool{ var (
New: func() interface{} { r = wal.NewReader(sr)
return []record.RefSeries{} dec record.Decoder
},
}
samplesPool := sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
walSeriesCount, walSamplesCount, walExemplarsCount int
)
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
switch dec.Type(rec) { switch dec.Type(rec) {
case record.Series: case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0] var series []record.RefSeries
series, _ = dec.Series(rec, series) series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series) walSeriesCount += len(series)
case record.Samples: case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0] var samples []record.RefSample
samples, _ = dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples) walSamplesCount += len(samples)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
default: default:
} }
} }
}
// Retrieved series count from WAL should match the count of series been added to the WAL. // Check that the WAL contained the same number of commited series/samples/exemplars.
require.Equal(t, walSeriesCount, numSeries) require.Equal(t, numSeries, walSeriesCount, "unexpected number of series")
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
// Retrieved samples count from WAL should match the count of samples been added to the WAL. require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
require.Equal(t, walSamplesCount, numSeries*numDatapoints)
} }
func TestRollback(t *testing.T) { func TestRollback(t *testing.T) {
@ -163,93 +189,68 @@ func TestRollback(t *testing.T) {
numSeries = 8 numSeries = 8
) )
promAgentDir := t.TempDir() s := createTestAgentDB(t, nil, DefaultOptions())
app := s.Appender(context.TODO())
lbls := labelsForTest(t.Name(), numSeries) lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func(rs *remote.Storage) {
require.NoError(t, rs.Close())
}(remoteStorage)
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
a := s.Appender(context.TODO())
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
sample := tsdbutil.GenerateSamples(0, 1) sample := tsdbutil.GenerateSamples(0, 1)
_, err := a.Append(0, lset, sample[0].T(), sample[0].V()) _, err := app.Append(0, lset, sample[0].T(), sample[0].V())
require.NoError(t, err) require.NoError(t, err)
} }
} }
require.NoError(t, a.Rollback()) // Do a rollback, which should clear uncommitted data. A followup call to
// commit should persist nothing to the WAL.
require.NoError(t, app.Rollback())
require.NoError(t, app.Commit())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
// Read records from WAL and check for expected count of series and samples. sr, err := wal.NewSegmentsReader(s.wal.Dir())
walSeriesCount := 0
walSamplesCount := 0
reg = prometheus.NewRegistry()
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func() {
require.NoError(t, remoteStorage.Close())
}()
s1, err := Open(logger, nil, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
defer func() {
require.NoError(t, s1.Close())
}()
var dec record.Decoder
if err == nil {
sr, err := wal.NewSegmentsReader(s1.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, sr.Close()) require.NoError(t, sr.Close())
}() }()
r := wal.NewReader(sr) // Read records from WAL and check for expected count of series and samples.
seriesPool := sync.Pool{ var (
New: func() interface{} { r = wal.NewReader(sr)
return []record.RefSeries{} dec record.Decoder
},
}
samplesPool := sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
walSeriesCount, walSamplesCount, walExemplarsCount int
)
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
switch dec.Type(rec) { switch dec.Type(rec) {
case record.Series: case record.Series:
series := seriesPool.Get().([]record.RefSeries)[:0] var series []record.RefSeries
series, _ = dec.Series(rec, series) series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series) walSeriesCount += len(series)
case record.Samples: case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0] var samples []record.RefSample
samples, _ = dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples) walSamplesCount += len(samples)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
default: default:
} }
} }
}
// Retrieved series count from WAL should be zero. // Check that the rollback ensured nothing got stored.
require.Equal(t, walSeriesCount, 0) require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL")
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
// Retrieved samples count from WAL should be zero. require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
require.Equal(t, walSamplesCount, 0)
} }
func TestFullTruncateWAL(t *testing.T) { func TestFullTruncateWAL(t *testing.T) {
@ -259,34 +260,25 @@ func TestFullTruncateWAL(t *testing.T) {
lastTs = 500 lastTs = 500
) )
promAgentDir := t.TempDir() reg := prometheus.NewRegistry()
lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions() opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2 opts.TruncateFrequency = time.Minute * 2
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func() {
require.NoError(t, remoteStorage.Close())
}()
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) s := createTestAgentDB(t, reg, opts)
require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, s.Close()) require.NoError(t, s.Close())
}() }()
app := s.Appender(context.TODO())
a := s.Appender(context.TODO()) lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, int64(lastTs), 0) _, err := app.Append(0, lset, int64(lastTs), 0)
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, a.Commit()) require.NoError(t, app.Commit())
} }
// Truncate WAL with mint to GC all the samples. // Truncate WAL with mint to GC all the samples.
@ -302,52 +294,40 @@ func TestPartialTruncateWAL(t *testing.T) {
numSeries = 800 numSeries = 800
) )
promAgentDir := t.TempDir()
opts := DefaultOptions() opts := DefaultOptions()
opts.TruncateFrequency = time.Minute * 2 opts.TruncateFrequency = time.Minute * 2
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func() {
require.NoError(t, remoteStorage.Close())
}()
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts) reg := prometheus.NewRegistry()
require.NoError(t, err) s := createTestAgentDB(t, reg, opts)
defer func() { defer func() {
require.NoError(t, s.Close()) require.NoError(t, s.Close())
}() }()
app := s.Appender(context.TODO())
a := s.Appender(context.TODO())
var lastTs int64
// Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500.
lastTs = 500 var lastTs int64 = 500
lbls := labelsForTest(t.Name()+"batch-1", numSeries) lbls := labelsForTest(t.Name()+"batch-1", numSeries)
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0) _, err := app.Append(0, lset, lastTs, 0)
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, a.Commit()) require.NoError(t, app.Commit())
} }
// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
lastTs = 600 lastTs = 600
lbls = labelsForTest(t.Name()+"batch-2", numSeries) lbls = labelsForTest(t.Name()+"batch-2", numSeries)
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0) _, err := app.Append(0, lset, lastTs, 0)
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, a.Commit()) require.NoError(t, app.Commit())
} }
// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
@ -364,53 +344,41 @@ func TestWALReplay(t *testing.T) {
lastTs = 500 lastTs = 500
) )
promAgentDir := t.TempDir() s := createTestAgentDB(t, nil, DefaultOptions())
app := s.Appender(context.TODO())
lbls := labelsForTest(t.Name(), numSeries) lbls := labelsForTest(t.Name(), numSeries)
opts := DefaultOptions()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), reg, startTime, promAgentDir, time.Second*30, nil)
defer func() {
require.NoError(t, remoteStorage.Close())
}()
s, err := Open(logger, reg, remoteStorage, promAgentDir, opts)
require.NoError(t, err)
a := s.Appender(context.TODO())
for _, l := range lbls { for _, l := range lbls {
lset := labels.New(l...) lset := labels.New(l...)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
_, err := a.Append(0, lset, lastTs, 0) _, err := app.Append(0, lset, lastTs, 0)
require.NoError(t, err) require.NoError(t, err)
} }
} }
require.NoError(t, a.Commit()) require.NoError(t, app.Commit())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
restartOpts := DefaultOptions() // Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
restartLogger := log.NewNopLogger() // We need the original directory so we can recreate the storage for replay.
restartReg := prometheus.NewRegistry() storageDir := filepath.Dir(s.wal.Dir())
// Open a new DB with the same WAL to check that series from the previous DB reg := prometheus.NewRegistry()
// get replayed. replayStorage, err := Open(s.logger, reg, nil, storageDir, s.opts)
replayDB, err := Open(restartLogger, restartReg, nil, promAgentDir, restartOpts) if err != nil {
require.NoError(t, err) t.Fatalf("unable to create storage for the agent: %v", err)
}
defer func() { defer func() {
require.NoError(t, replayDB.Close()) require.NoError(t, replayStorage.Close())
}() }()
// Check if all the series are retrieved back from the WAL. // Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, restartReg, "prometheus_agent_active_series") m := gatherFamily(t, reg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained. // Check if lastTs of the samples retrieved from the WAL is retained.
metrics := replayDB.series.series metrics := replayStorage.series.series
for i := 0; i < len(metrics); i++ { for i := 0; i < len(metrics); i++ {
mp := metrics[i] mp := metrics[i]
for _, v := range mp { for _, v := range mp {

View file

@ -26,9 +26,24 @@ type memSeries struct {
ref chunks.HeadSeriesRef ref chunks.HeadSeriesRef
lset labels.Labels lset labels.Labels
// Last recorded timestamp. Used by Storage.gc to determine if a series is
// stale.
lastTs int64 lastTs int64
} }
// updateTimestamp obtains the lock on s and will attempt to update lastTs.
// fails if newTs < lastTs.
func (m *memSeries) updateTimestamp(newTs int64) bool {
m.Lock()
defer m.Unlock()
if newTs >= m.lastTs {
m.lastTs = newTs
return true
}
return false
}
// seriesHashmap is a simple hashmap for memSeries by their label set. // seriesHashmap is a simple hashmap for memSeries by their label set.
// It is built on top of a regular hashmap and holds a slice of series to // It is built on top of a regular hashmap and holds a slice of series to
// resolve hash collisions. Its methods require the hash to be submitted // resolve hash collisions. Its methods require the hash to be submitted