tsdb: sort checkpoints by segment number (#6987)

Signed-off-by: zhulongcheng <zhulongcheng.dev@gmail.com>
This commit is contained in:
zhulongcheng 2020-03-18 23:10:41 +08:00 committed by GitHub
parent 012161d90d
commit 5f5c7a4477
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 30 deletions

View file

@ -21,6 +21,7 @@ import (
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
@ -44,46 +45,32 @@ type CheckpointStats struct {
// LastCheckpoint returns the directory name and index of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
checkpoints, err := listCheckpoints(dir)
if err != nil {
return "", 0, err
}
// Traverse list backwards since there may be multiple checkpoints left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return filepath.Join(dir, fi.Name()), idx, nil
if len(checkpoints) == 0 {
return "", 0, record.ErrNotFound
}
return "", 0, record.ErrNotFound
checkpoint := checkpoints[len(checkpoints)-1]
return filepath.Join(dir, checkpoint.name), checkpoint.index, nil
}
// DeleteCheckpoints deletes all checkpoints in a directory below a given index.
func DeleteCheckpoints(dir string, maxIndex int) error {
var errs tsdb_errors.MultiError
files, err := ioutil.ReadDir(dir)
checkpoints, err := listCheckpoints(dir)
if err != nil {
return err
}
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
var errs tsdb_errors.MultiError
for _, checkpoint := range checkpoints {
if checkpoint.index >= maxIndex {
break
}
index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || index >= maxIndex {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
if err := os.RemoveAll(filepath.Join(dir, checkpoint.name)); err != nil {
errs.Add(err)
}
}
@ -130,7 +117,7 @@ func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*C
defer sgmReader.Close()
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdir := checkpointDir(w.Dir(), to)
cpdirtmp := cpdir + ".tmp"
if err := os.RemoveAll(cpdirtmp); err != nil {
@ -264,3 +251,41 @@ func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*C
return stats, nil
}
func checkpointDir(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i))
}
type checkpointRef struct {
name string
index int
}
func listCheckpoints(dir string) (refs []checkpointRef, err error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
for i := 0; i < len(files); i++ {
fi := files[i]
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
refs = append(refs, checkpointRef{name: fi.Name(), index: idx})
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].index < refs[j].index
})
return refs, nil
}

View file

@ -62,6 +62,18 @@ func TestLastCheckpoint(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, filepath.Join(dir, "checkpoint.1000"), s)
testutil.Equals(t, 1000, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, filepath.Join(dir, "checkpoint.99999999"), s)
testutil.Equals(t, 99999999, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, filepath.Join(dir, "checkpoint.100000000"), s)
testutil.Equals(t, 100000000, k)
}
func TestDeleteCheckpoints(t *testing.T) {
@ -83,6 +95,16 @@ func TestDeleteCheckpoints(t *testing.T) {
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.99999999"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000000"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.100000001"), 0777))
testutil.Ok(t, DeleteCheckpoints(dir, 100000000))
files, err = fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, []string{"checkpoint.100000000", "checkpoint.100000001"}, files)
}
func TestCheckpoint(t *testing.T) {
@ -159,9 +181,9 @@ func TestCheckpoint(t *testing.T) {
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
testutil.Equals(t, "checkpoint.00000106", files[0])
sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.00000106"))
testutil.Ok(t, err)
defer sr.Close()