mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Keep series that are still in WAL in checkpoints (#577)
If all the samples are deleted for a series, we should still keep the series in the WAL as anything else reading the WAL will still care about it in order to understand the samples. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
259847a6b1
commit
dfed85e4a4
38
head.go
38
head.go
|
@ -75,6 +75,9 @@ type Head struct {
|
|||
symbols map[string]struct{}
|
||||
values map[string]stringset // label names to possible values
|
||||
|
||||
deletedMtx sync.Mutex
|
||||
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
|
||||
|
||||
postings *index.MemPostings // postings lists for terms
|
||||
}
|
||||
|
||||
|
@ -234,6 +237,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
|
|||
values: map[string]stringset{},
|
||||
symbols: map[string]struct{}{},
|
||||
postings: index.NewUnorderedMemPostings(),
|
||||
deleted: map[uint64]int{},
|
||||
}
|
||||
h.metrics = newHeadMetrics(h, r)
|
||||
|
||||
|
@ -557,7 +561,13 @@ func (h *Head) Truncate(mint int64) (err error) {
|
|||
}
|
||||
|
||||
keep := func(id uint64) bool {
|
||||
return h.series.getByID(id) != nil
|
||||
if h.series.getByID(id) != nil {
|
||||
return true
|
||||
}
|
||||
h.deletedMtx.Lock()
|
||||
_, ok := h.deleted[id]
|
||||
h.deletedMtx.Unlock()
|
||||
return ok
|
||||
}
|
||||
h.metrics.checkpointCreationTotal.Inc()
|
||||
if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil {
|
||||
|
@ -570,6 +580,17 @@ func (h *Head) Truncate(mint int64) (err error) {
|
|||
// that supersedes them.
|
||||
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
|
||||
}
|
||||
|
||||
// The checkpoint is written and segments before it is truncated, so we no
|
||||
// longer need to track deleted series that are before it.
|
||||
h.deletedMtx.Lock()
|
||||
for ref, segment := range h.deleted {
|
||||
if segment < first {
|
||||
delete(h.deleted, ref)
|
||||
}
|
||||
}
|
||||
h.deletedMtx.Unlock()
|
||||
|
||||
h.metrics.checkpointDeleteTotal.Inc()
|
||||
if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil {
|
||||
// Leftover old checkpoints do not cause problems down the line beyond
|
||||
|
@ -953,6 +974,21 @@ func (h *Head) gc() {
|
|||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted)
|
||||
|
||||
if h.wal != nil {
|
||||
_, last, _ := h.wal.Segments()
|
||||
h.deletedMtx.Lock()
|
||||
// Keep series records until we're past segment 'last'
|
||||
// because the WAL will still have samples records with
|
||||
// this ref ID. If we didn't keep these series records then
|
||||
// on start up when we replay the WAL, or any other code
|
||||
// that reads the WAL, wouldn't be able to use those
|
||||
// samples since we would have no labels for that ref ID.
|
||||
for ref := range deleted {
|
||||
h.deleted[ref] = last
|
||||
}
|
||||
h.deletedMtx.Unlock()
|
||||
}
|
||||
|
||||
// Rebuild symbols and label value indices from what is left in the postings terms.
|
||||
symbols := make(map[string]struct{}, len(h.symbols))
|
||||
values := make(map[string]stringset, len(h.values))
|
||||
|
|
51
head_test.go
51
head_test.go
|
@ -501,6 +501,57 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls)
|
||||
}
|
||||
|
||||
func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "test_delete_wal")
|
||||
testutil.Ok(t, err)
|
||||
defer func() {
|
||||
testutil.Ok(t, os.RemoveAll(dir))
|
||||
}()
|
||||
wlog, err := wal.NewSize(nil, nil, dir, 32768)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Enough samples to cause a checkpoint.
|
||||
numSamples := 10000
|
||||
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10)
|
||||
testutil.Ok(t, err)
|
||||
defer hb.Close()
|
||||
for i := 0; i < numSamples; i++ {
|
||||
app := hb.Appender()
|
||||
_, err := app.Add(labels.Labels{{"a", "b"}}, int64(i), 0)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
}
|
||||
testutil.Ok(t, hb.Delete(0, int64(numSamples), labels.NewEqualMatcher("a", "b")))
|
||||
testutil.Ok(t, hb.Truncate(1))
|
||||
testutil.Ok(t, hb.Close())
|
||||
|
||||
// Confirm there's been a checkpoint.
|
||||
cdir, _, err := LastCheckpoint(dir)
|
||||
testutil.Ok(t, err)
|
||||
// Read in checkpoint and WAL.
|
||||
recs := readTestWAL(t, cdir)
|
||||
recs = append(recs, readTestWAL(t, dir)...)
|
||||
|
||||
var series, samples, stones int
|
||||
for _, rec := range recs {
|
||||
switch rec.(type) {
|
||||
case []RefSeries:
|
||||
series++
|
||||
case []RefSample:
|
||||
samples++
|
||||
case []Stone:
|
||||
stones++
|
||||
default:
|
||||
t.Fatalf("unknown record type")
|
||||
}
|
||||
}
|
||||
testutil.Equals(t, 1, series)
|
||||
testutil.Equals(t, 9999, samples)
|
||||
testutil.Equals(t, 1, stones)
|
||||
|
||||
}
|
||||
|
||||
func TestDelete_e2e(t *testing.T) {
|
||||
numDatapoints := 1000
|
||||
numRanges := 1000
|
||||
|
|
Loading…
Reference in a new issue