From 5f5c7a44778e6ea01fb82ce9dc707934e98fd3e8 Mon Sep 17 00:00:00 2001 From: zhulongcheng Date: Wed, 18 Mar 2020 23:10:41 +0800 Subject: [PATCH] tsdb: sort checkpoints by segment number (#6987) Signed-off-by: zhulongcheng --- tsdb/wal/checkpoint.go | 81 ++++++++++++++++++++++++------------- tsdb/wal/checkpoint_test.go | 26 +++++++++++- 2 files changed, 77 insertions(+), 30 deletions(-) diff --git a/tsdb/wal/checkpoint.go b/tsdb/wal/checkpoint.go index 6b0ecd54fa..d205c61212 100644 --- a/tsdb/wal/checkpoint.go +++ b/tsdb/wal/checkpoint.go @@ -21,6 +21,7 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "strings" @@ -44,46 +45,32 @@ type CheckpointStats struct { // LastCheckpoint returns the directory name and index of the most recent checkpoint. // If dir does not contain any checkpoints, ErrNotFound is returned. func LastCheckpoint(dir string) (string, int, error) { - files, err := ioutil.ReadDir(dir) + checkpoints, err := listCheckpoints(dir) if err != nil { return "", 0, err } - // Traverse list backwards since there may be multiple checkpoints left. - for i := len(files) - 1; i >= 0; i-- { - fi := files[i] - if !strings.HasPrefix(fi.Name(), checkpointPrefix) { - continue - } - if !fi.IsDir() { - return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) - } - idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) - if err != nil { - continue - } - return filepath.Join(dir, fi.Name()), idx, nil + if len(checkpoints) == 0 { + return "", 0, record.ErrNotFound } - return "", 0, record.ErrNotFound + + checkpoint := checkpoints[len(checkpoints)-1] + return filepath.Join(dir, checkpoint.name), checkpoint.index, nil } // DeleteCheckpoints deletes all checkpoints in a directory below a given index. func DeleteCheckpoints(dir string, maxIndex int) error { - var errs tsdb_errors.MultiError - - files, err := ioutil.ReadDir(dir) + checkpoints, err := listCheckpoints(dir) if err != nil { return err } - for _, fi := range files { - if !strings.HasPrefix(fi.Name(), checkpointPrefix) { - continue + + var errs tsdb_errors.MultiError + for _, checkpoint := range checkpoints { + if checkpoint.index >= maxIndex { + break } - index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) - if err != nil || index >= maxIndex { - continue - } - if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + if err := os.RemoveAll(filepath.Join(dir, checkpoint.name)); err != nil { errs.Add(err) } } @@ -130,7 +117,7 @@ func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*C defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) + cpdir := checkpointDir(w.Dir(), to) cpdirtmp := cpdir + ".tmp" if err := os.RemoveAll(cpdirtmp); err != nil { @@ -264,3 +251,41 @@ func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*C return stats, nil } + +func checkpointDir(dir string, i int) string { + return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) +} + +type checkpointRef struct { + name string + index int +} + +func listCheckpoints(dir string) (refs []checkpointRef, err error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + for i := 0; i < len(files); i++ { + fi := files[i] + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + if !fi.IsDir() { + return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + } + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + + refs = append(refs, checkpointRef{name: fi.Name(), index: idx}) + } + + sort.Slice(refs, func(i, j int) bool { + return refs[i].index < refs[j].index + }) + + return refs, nil +} diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index 48ae842367..eb0e22a4b8 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -62,6 +62,18 @@ func TestLastCheckpoint(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, filepath.Join(dir, "checkpoint.1000"), s) testutil.Equals(t, 1000, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, filepath.Join(dir, "checkpoint.99999999"), s) + testutil.Equals(t, 99999999, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, filepath.Join(dir, "checkpoint.100000000"), s) + testutil.Equals(t, 100000000, k) } func TestDeleteCheckpoints(t *testing.T) { @@ -83,6 +95,16 @@ func TestDeleteCheckpoints(t *testing.T) { files, err := fileutil.ReadDir(dir) testutil.Ok(t, err) testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000001"), 0777)) + + testutil.Ok(t, DeleteCheckpoints(dir, 100000000)) + + files, err = fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, []string{"checkpoint.100000000", "checkpoint.100000001"}, files) } func TestCheckpoint(t *testing.T) { @@ -159,9 +181,9 @@ func TestCheckpoint(t *testing.T) { files, err := fileutil.ReadDir(dir) testutil.Ok(t, err) testutil.Equals(t, 1, len(files)) - testutil.Equals(t, "checkpoint.000106", files[0]) + testutil.Equals(t, "checkpoint.00000106", files[0]) - sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.00000106")) testutil.Ok(t, err) defer sr.Close()