Trigger reload correctly on interrupted compaction

This commit is contained in:
Fabian Reinartz 2017-03-20 10:41:43 +01:00
parent 2c999836fb
commit 3635569257
4 changed files with 20 additions and 10 deletions

View file

@ -2,6 +2,7 @@ package tsdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -174,6 +175,10 @@ func (pb *persistedBlock) Close() error {
return merr.Err()
}
func (pb *persistedBlock) String() string {
return fmt.Sprintf("(%d, %s)", pb.meta.Sequence, pb.meta.ULID)
}
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
return &blockQuerier{
mint: mint,

View file

@ -1,6 +1,7 @@
package tsdb
import (
"fmt"
"math/rand"
"os"
"path/filepath"
@ -8,6 +9,7 @@ import (
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -32,6 +34,7 @@ type Compactor interface {
// compactor implements the Compactor interface.
type compactor struct {
metrics *compactorMetrics
logger log.Logger
opts *compactorOptions
}
@ -71,9 +74,10 @@ type compactorOptions struct {
maxBlockRange uint64
}
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
func newCompactor(r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
return &compactor{
opts: opts,
logger: l,
metrics: newCompactorMetrics(r),
}
}
@ -178,6 +182,8 @@ func (c *compactor) Write(dir string, b Block) error {
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *compactor) write(dir string, blocks ...Block) (err error) {
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
defer func(t time.Time) {
if err != nil {
c.metrics.failed.Inc()

13
db.go
View file

@ -174,7 +174,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
db.compactor = newCompactor(r, &compactorOptions{
db.compactor = newCompactor(r, l, &compactorOptions{
maxBlockRange: opts.MaxBlockDuration,
})
@ -269,14 +269,10 @@ func (db *DB) compact() (changes bool, err error) {
db.headmtx.RUnlock()
db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles))
Loop:
for _, h := range singles {
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID)
select {
case <-db.stopc:
break Loop
return changes, nil
default:
}
@ -295,16 +291,15 @@ Loop:
select {
case <-db.stopc:
return false, nil
return changes, nil
default:
}
// We just execute compactions sequentially to not cause too extreme
// CPU and memory spikes.
// TODO(fabxc): return more descriptive plans in the future that allow
// estimation of resource usage and conditional parallelization?
for _, p := range plans {
db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p))
if err := db.compactor.Compact(p...); err != nil {
return changes, errors.Wrapf(err, "compact %s", p)
}

View file

@ -135,6 +135,10 @@ func (h *headBlock) inBounds(t int64) bool {
return t >= h.meta.MinTime && t <= h.meta.MaxTime
}
func (h *headBlock) String() string {
return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID)
}
// Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error {
h.mtx.Lock()