track the last segment ID for which we appended a sample per series, and

use that to know if we can delete a series from the WAL/checkpoint
entirely

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2023-04-26 16:26:34 -07:00
parent d4bf47766e
commit d07b70a2a8
4 changed files with 115 additions and 32 deletions

View file

@ -64,6 +64,18 @@ var (
defaultWALReplayConcurrency = runtime.GOMAXPROCS(0)
)
type seriesAndSegment struct {
*stripeSeries
segments map[storage.SeriesRef]int
}
func newSeriesAndSegment(stripeSize int, seriesCallback SeriesLifecycleCallback) seriesAndSegment {
var s seriesAndSegment
s.stripeSeries = newStripeSeries(stripeSize, seriesCallback)
s.segments = make(map[storage.SeriesRef]int)
return s
}
// Head handles reads and writes of time series data within a time window.
type Head struct {
chunkRange atomic.Int64
@ -94,7 +106,7 @@ type Head struct {
memChunkPool sync.Pool
// All series addressable by their ID or hash.
series *stripeSeries
series seriesAndSegment
deletedMtx sync.Mutex
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.
@ -296,7 +308,7 @@ func (h *Head) resetInMemoryState() error {
h.exemplarMetrics = em
h.exemplars = es
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.series = newSeriesAndSegment(h.opts.StripeSize, h.opts.SeriesCallback)
h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones()
h.deleted = map[chunks.HeadSeriesRef]int{}
@ -1211,10 +1223,10 @@ func (h *Head) truncateWAL(mint int64) error {
if h.series.getByID(id) != nil {
return true
}
h.deletedMtx.Lock()
_, ok := h.deleted[id]
h.deletedMtx.Unlock()
return ok
if h.series.segments[storage.SeriesRef(id)] > last {
return true
}
return false
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
@ -1231,16 +1243,6 @@ func (h *Head) truncateWAL(mint int64) error {
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
// The checkpoint is written and segments before it is truncated, so we no
// longer need to track deleted series that are before it.
h.deletedMtx.Lock()
for ref, segment := range h.deleted {
if segment < first {
delete(h.deleted, ref)
}
}
h.deletedMtx.Unlock()
h.metrics.checkpointDeleteTotal.Inc()
if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
@ -1488,21 +1490,6 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.tombstones.DeleteTombstones(deleted)
h.tombstones.TruncateBefore(mint)
if h.wal != nil {
_, last, _ := wlog.Segments(h.wal.Dir())
h.deletedMtx.Lock()
// Keep series records until we're past segment 'last'
// because the WAL will still have samples records with
// this ref ID. If we didn't keep these series records then
// on start up when we replay the WAL, or any other code
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.deleted[chunks.HeadSeriesRef(ref)] = last
}
h.deletedMtx.Unlock()
}
return actualInOrderMint, minOOOTime, minMmapFile
}

View file

@ -48,7 +48,11 @@ func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
a.head.initTime(t)
a.app = a.head.appender()
return a.app.Append(ref, lset, t, v)
ref, err := a.app.Append(ref, lset, t, v)
if err != nil {
a.head.series.segments[ref] = a.head.wal.CurrentSegment()
}
return ref, err
}
func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {

View file

@ -1095,6 +1095,94 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
require.Equal(t, 0, metadata)
}
func TestDeleteWithSegmentIndexLogic(t *testing.T) {
numSamples := 10000
// Enough samples to cause a checkpoint.
hb, w := newTestHead(t, int64(numSamples)*10, false, false)
for i := 0; i < numSamples; i++ {
app := hb.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
require.NoError(t, hb.Truncate(1))
// Confirm there's been a checkpoint.
cdir, _, err := wlog.LastCheckpoint(w.Dir())
require.NoError(t, err)
// Read in checkpoint and WAL.
recs := readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, w.Dir())...)
var series, samples, stones, metadata int
for _, rec := range recs {
switch rec.(type) {
case []record.RefSeries:
series++
case []record.RefSample:
samples++
case []tombstones.Stone:
stones++
case []record.RefMetadata:
metadata++
default:
t.Fatalf("unknown record type")
}
}
require.Equal(t, 1, series)
require.Equal(t, 9999, samples)
require.Equal(t, 0, metadata)
// lets write some samples for a new series and truncate again to see if deletes work
// as expected
n := time.Now().Unix()
// create a new segment so we avoid samples for the previous series being in the same segment as
// our new series for the purposes of the test
w.NextSegmentSync()
for i := 0; i < numSamples; i++ {
app := hb.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("b", "c"), int64(i)+n, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
require.NoError(t, hb.Truncate(n))
require.NoError(t, hb.Close())
// Confirm there's been a checkpoint.
cdir, _, err = wlog.LastCheckpoint(w.Dir())
require.NoError(t, err)
// Read in checkpoint and WAL.
recs = readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, w.Dir())...)
// reset the counters
series = 0
samples = 0
stones = 0
metadata = 0
for _, rec := range recs {
switch rec.(type) {
case []record.RefSeries:
series++
case []record.RefSample:
samples++
case []tombstones.Stone:
stones++
case []record.RefMetadata:
metadata++
default:
t.Fatalf("unknown record type")
}
}
require.Equal(t, 1, series)
require.Equal(t, 10000, samples)
require.Equal(t, 0, metadata)
}
func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000
numRanges := 1000

View file

@ -748,6 +748,10 @@ func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) {
return
}
func (w *WL) CurrentSegment() int {
return w.segment.i
}
// Truncate drops all segments before i.
func (w *WL) Truncate(i int) (err error) {
w.metrics.truncateTotal.Inc()