From 886945cda7acd1649200a094f9c385304c36e099 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 27 Jul 2023 09:28:26 -0400 Subject: [PATCH] tsdb/agent: ensure that new series get written to WAL on rollback (#12592) If a new series is introduced in a storage.Appender instance, that series should be written to the WAL once the storage.Appender is closed, even on Rollback. Previously, new series would only be written to the WAL when calling Commit. However, because the series is stored in memory regardless, subsequent calls to Commit may write samples to the WAL which reference a series ID which that was never written. Related to #11589. It's likely that this fix also resolves this issue, but we need more testing from users to see if the problem persists after this fix; there may be more cases where samples get written to the WAL in Prometheus Agent mode without the corresponding series record. Signed-off-by: Robert Fratto --- tsdb/agent/db.go | 54 +++++++++++++++++++++++++++++++++++++++---- tsdb/agent/db_test.go | 4 ++-- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index d47095a238..2ed51e4b81 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -985,11 +985,25 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met // Commit submits the collected samples and purges the batch. func (a *appender) Commit() error { + if err := a.log(); err != nil { + return err + } + + a.clearData() + a.appenderPool.Put(a) + return nil +} + +// log logs all pending data to the WAL. +func (a *appender) log() error { a.mtx.RLock() defer a.mtx.RUnlock() var encoder record.Encoder buf := a.bufPool.Get().([]byte) + defer func() { + a.bufPool.Put(buf) //nolint:staticcheck + }() if len(a.pendingSeries) > 0 { buf = encoder.Series(a.pendingSeries, buf) @@ -1051,12 +1065,11 @@ func (a *appender) Commit() error { } } - //nolint:staticcheck - a.bufPool.Put(buf) - return a.Rollback() + return nil } -func (a *appender) Rollback() error { +// clearData clears all pending data. +func (a *appender) clearData() { a.pendingSeries = a.pendingSeries[:0] a.pendingSamples = a.pendingSamples[:0] a.pendingHistograms = a.pendingHistograms[:0] @@ -1065,6 +1078,39 @@ func (a *appender) Rollback() error { a.sampleSeries = a.sampleSeries[:0] a.histogramSeries = a.histogramSeries[:0] a.floatHistogramSeries = a.floatHistogramSeries[:0] +} + +func (a *appender) Rollback() error { + // Series are created in-memory regardless of rollback. This means we must + // log them to the WAL, otherwise subsequent commits may reference a series + // which was never written to the WAL. + if err := a.logSeries(); err != nil { + return err + } + + a.clearData() a.appenderPool.Put(a) return nil } + +// logSeries logs only pending series records to the WAL. +func (a *appender) logSeries() error { + a.mtx.RLock() + defer a.mtx.RUnlock() + + if len(a.pendingSeries) > 0 { + buf := a.bufPool.Get().([]byte) + defer func() { + a.bufPool.Put(buf) //nolint:staticcheck + }() + + var encoder record.Encoder + buf = encoder.Series(a.pendingSeries, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + return nil +} diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index e284e1b77e..5c71d548f8 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -333,8 +333,8 @@ func TestRollback(t *testing.T) { } } - // Check that the rollback ensured nothing got stored. - require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL") + // Check that only series get stored after calling Rollback. + require.Equal(t, numSeries*3, walSeriesCount, "series should have been written to WAL") require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")