Add initial retention cutoff

This commit is contained in:
Fabian Reinartz 2017-02-09 17:54:26 -08:00
parent f1435f2e2c
commit 9c76624df2
2 changed files with 49 additions and 11 deletions

View file

@ -90,7 +90,13 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dir := filepath.Join(b.outPath, "storage")
st, err := tsdb.OpenPartitioned(dir, 1, nil, nil)
st, err := tsdb.OpenPartitioned(dir, 1, nil, &tsdb.Options{
WALFlushInterval: 5 * time.Second,
RetentionDuration: 1 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 16 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2,
})
if err != nil {
exitWithError(err)
}
@ -122,7 +128,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dur := measureTime("ingestScrapes", func() {
b.startProfiling()
total, err = b.ingestScrapes(metrics, 3000)
total, err = b.ingestScrapes(metrics, 4000)
if err != nil {
exitWithError(err)
}

50
db.go
View file

@ -28,10 +28,11 @@ import (
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestampdb.
var DefaultOptions = &Options{
WALFlushInterval: 5 * time.Second,
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2,
WALFlushInterval: 5 * time.Second,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2,
}
// Options of the DB storage.
@ -39,6 +40,9 @@ type Options struct {
// The interval at which the write ahead log is flushed to disc.
WALFlushInterval time.Duration
// Duration of persisted data to keep.
RetentionDuration uint64
// The timestamp range of head blocks after which they get persisted.
// It's the minimum duration of any persisted block.
MinBlockDuration uint64
@ -284,6 +288,34 @@ func (db *DB) compact(i, j int) error {
return errors.Wrap(err, "removing old block")
}
}
return db.retentionCutoff()
}
func (db *DB) retentionCutoff() error {
h := db.heads[len(db.heads)-1]
t := h.meta.MinTime - int64(db.opts.RetentionDuration)
var (
blocks = db.blocks()
i int
b Block
)
for i, b = range blocks {
if b.Meta().MinTime >= t {
break
}
}
if i <= 1 {
return nil
}
db.logger.Log("msg", "retention cutoff", "idx", i-1)
db.removeBlocks(0, i)
for _, b := range blocks[:i] {
if err := os.RemoveAll(b.Dir()); err != nil {
return errors.Wrap(err, "removing old block")
}
}
return nil
}
@ -566,7 +598,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
func (db *DB) cut(mint int64) (*headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextBlockDir(db.dir)
dir, seq, err := nextSequenceDir(db.dir, "b-")
if err != nil {
return nil, err
}
@ -616,7 +648,7 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func nextBlockDir(dir string) (string, int, error) {
func nextSequenceDir(dir, prefix string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
@ -624,16 +656,16 @@ func nextBlockDir(dir string) (string, int, error) {
i := uint64(0)
for _, n := range names {
if !strings.HasPrefix(n, "b-") {
if !strings.HasPrefix(n, prefix) {
continue
}
j, err := strconv.ParseUint(n[2:], 10, 32)
j, err := strconv.ParseUint(n[len(prefix):], 10, 32)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), int(i + 1), nil
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
}
// PartitionedDB is a time series storage.