mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Fix deletion of old snapshots (#9314)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
48a101be1b
commit
8944520ccc
|
@ -25,6 +25,7 @@ import (
|
|||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -2586,6 +2587,32 @@ func TestChunkSnapshot(t *testing.T) {
|
|||
require.Equal(t, expExemplars, actExemplars)
|
||||
}
|
||||
|
||||
var (
|
||||
wlast, woffset int
|
||||
err error
|
||||
)
|
||||
|
||||
closeHeadAndCheckSnapshot := func() {
|
||||
require.NoError(t, head.Close())
|
||||
|
||||
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wlast, sidx)
|
||||
require.Equal(t, woffset, soffset)
|
||||
}
|
||||
|
||||
openHeadAndCheckReplay := func() {
|
||||
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
checkSamples()
|
||||
checkTombstones()
|
||||
checkExemplars()
|
||||
}
|
||||
|
||||
{ // Initial data that goes into snapshot.
|
||||
// Add some initial samples with >=1 m-map chunk.
|
||||
app := head.Appender(context.Background())
|
||||
|
@ -2630,31 +2657,16 @@ func TestChunkSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// These references should be the ones used for the snapshot.
|
||||
wlast, woffset, err := head.wal.LastSegmentAndOffset()
|
||||
wlast, woffset, err = head.wal.LastSegmentAndOffset()
|
||||
require.NoError(t, err)
|
||||
|
||||
{ // Creating snapshot and verifying it.
|
||||
{
|
||||
// Creating snapshot and verifying it.
|
||||
head.opts.EnableMemorySnapshotOnShutdown = true
|
||||
require.NoError(t, head.Close()) // This will create a snapshot.
|
||||
closeHeadAndCheckSnapshot() // This will create a snapshot.
|
||||
|
||||
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wlast, sidx)
|
||||
require.Equal(t, woffset, soffset)
|
||||
}
|
||||
|
||||
{ // Test the replay of snapshot.
|
||||
// Create new Head which should replay this snapshot.
|
||||
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
// Test query for snapshot replay.
|
||||
checkSamples()
|
||||
checkTombstones()
|
||||
checkExemplars()
|
||||
// Test the replay of snapshot.
|
||||
openHeadAndCheckReplay()
|
||||
}
|
||||
|
||||
{ // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk.
|
||||
|
@ -2700,30 +2712,42 @@ func TestChunkSnapshot(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
{ // Close Head and verify that new snapshot was not created.
|
||||
{
|
||||
// Close Head and verify that new snapshot was not created.
|
||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||
require.NoError(t, head.Close()) // This should not create a snapshot.
|
||||
closeHeadAndCheckSnapshot() // This should not create a snapshot.
|
||||
|
||||
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wlast, sidx)
|
||||
require.Equal(t, woffset, soffset)
|
||||
}
|
||||
|
||||
{ // Test the replay of snapshot, m-map chunks, and WAL.
|
||||
// Create new Head to replay snapshot, m-map chunks, and WAL.
|
||||
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
// Test the replay of snapshot, m-map chunks, and WAL.
|
||||
head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot.
|
||||
head, err = NewHead(nil, nil, w, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
// Test query when data is replayed from snapshot, m-map chunks, and WAL.
|
||||
checkSamples()
|
||||
checkTombstones()
|
||||
checkExemplars()
|
||||
openHeadAndCheckReplay()
|
||||
}
|
||||
|
||||
// Creating another snapshot should delete the older snapshot and replay still works fine.
|
||||
wlast, woffset, err = head.wal.LastSegmentAndOffset()
|
||||
require.NoError(t, err)
|
||||
|
||||
{
|
||||
// Close Head and verify that new snapshot was created.
|
||||
closeHeadAndCheckSnapshot()
|
||||
|
||||
// Verify that there is only 1 snapshot.
|
||||
files, err := ioutil.ReadDir(head.opts.ChunkDirRoot)
|
||||
require.NoError(t, err)
|
||||
snapshots := 0
|
||||
for i := len(files) - 1; i >= 0; i-- {
|
||||
fi := files[i]
|
||||
if strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
||||
snapshots++
|
||||
require.Equal(t, chunkSnapshotDir(wlast, woffset), fi.Name())
|
||||
}
|
||||
}
|
||||
require.Equal(t, 1, snapshots)
|
||||
|
||||
// Test the replay of snapshot.
|
||||
head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot.
|
||||
openHeadAndCheckReplay()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSnapshotError(t *testing.T) {
|
||||
|
|
|
@ -568,7 +568,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|||
return stats, nil
|
||||
}
|
||||
|
||||
snapshotName := fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
|
||||
snapshotName := chunkSnapshotDir(wlast, woffset)
|
||||
|
||||
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
|
||||
cpdirtmp := cpdir + ".tmp"
|
||||
|
@ -690,7 +690,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|||
return stats, errors.Wrap(err, "rename chunk snapshot directory")
|
||||
}
|
||||
|
||||
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, cslast, csoffset); err != nil {
|
||||
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
|
||||
// Leftover old chunk snapshots do not cause problems down the line beyond
|
||||
// occupying disk space.
|
||||
// They will just be ignored since a higher chunk snapshot exists.
|
||||
|
@ -699,6 +699,10 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
|
|||
return stats, nil
|
||||
}
|
||||
|
||||
func chunkSnapshotDir(wlast, woffset int) string {
|
||||
return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
|
||||
}
|
||||
|
||||
func (h *Head) performChunkSnapshot() error {
|
||||
level.Info(h.logger).Log("msg", "creating chunk snapshot")
|
||||
startTime := time.Now()
|
||||
|
@ -723,8 +727,9 @@ func LastChunkSnapshot(dir string) (string, int, int, error) {
|
|||
if err != nil {
|
||||
return "", 0, 0, err
|
||||
}
|
||||
// Traverse list backwards since there may be multiple chunk snapshots left.
|
||||
for i := len(files) - 1; i >= 0; i-- {
|
||||
maxIdx, maxOffset := -1, -1
|
||||
maxFileName := ""
|
||||
for i := 0; i < len(files); i++ {
|
||||
fi := files[i]
|
||||
|
||||
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
||||
|
@ -749,9 +754,15 @@ func LastChunkSnapshot(dir string) (string, int, int, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
return filepath.Join(dir, fi.Name()), idx, offset, nil
|
||||
if idx > maxIdx || (idx == maxIdx && offset > maxOffset) {
|
||||
maxIdx, maxOffset = idx, offset
|
||||
maxFileName = filepath.Join(dir, fi.Name())
|
||||
}
|
||||
}
|
||||
return "", 0, 0, record.ErrNotFound
|
||||
if maxFileName == "" {
|
||||
return "", 0, 0, record.ErrNotFound
|
||||
}
|
||||
return maxFileName, maxIdx, maxOffset, nil
|
||||
}
|
||||
|
||||
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
|
||||
|
@ -782,7 +793,7 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
|
|||
continue
|
||||
}
|
||||
|
||||
if idx <= maxIndex && offset < maxOffset {
|
||||
if idx < maxIndex || (idx == maxIndex && offset < maxOffset) {
|
||||
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
||||
errs.Add(err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue