Merge pull request #6820 from codesome/break-compact

Break DB.Compact and DB.CompactHead and DB.CompactBlocks
This commit is contained in:
Bartlomiej Plotka 2020-02-17 13:20:21 +00:00 committed by GitHub
commit 88af973663
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 43 deletions

View file

@ -324,7 +324,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error {
}
mint := head.MinTime()
maxt := head.MaxTime()
rh := &rangeHead{
rh := &RangeHead{
head: head,
mint: mint,
maxt: maxt,
@ -685,41 +685,61 @@ func (db *DB) Compact() (err error) {
maxt := rangeForTimestamp(mint, db.head.chunkRange)
// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
head: db.head,
mint: mint,
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
maxt: maxt - 1,
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
head := NewRangeHead(db.head, mint, maxt-1)
if err := db.compactHead(head, mint, maxt); err != nil {
return err
}
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}
runtime.GC()
if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
runtime.GC()
}
return db.compactBlocks()
}
// CompactHead compacts the given the RangeHead.
func (db *DB) CompactHead(head *RangeHead, mint, maxt int64) (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
return db.compactHead(head, mint, maxt)
}
// compactHead compacts the given the RangeHead.
// The compaction mutex should be held before calling this method.
func (db *DB) compactHead(head *RangeHead, mint, maxt int64) (err error) {
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}
runtime.GC()
if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid)
}
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
runtime.GC()
return nil
}
// compactBlocks compacts all the eligible on-disk blocks.
// The compaction mutex should be held before calling this method.
func (db *DB) compactBlocks() (err error) {
// Check for compactions of multiple blocks.
for {
plan, err := db.compactor.Plan(db.dir)
@ -1192,7 +1212,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
mint := db.head.MinTime()
maxt := db.head.MaxTime()
head := &rangeHead{
head := &RangeHead{
head: db.head,
mint: mint,
maxt: maxt,
@ -1221,7 +1241,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, &rangeHead{
blocks = append(blocks, &RangeHead{
head: db.head,
mint: mint,
maxt: maxt,

View file

@ -748,36 +748,45 @@ func (h *Head) initTime(t int64) (initialized bool) {
return true
}
type rangeHead struct {
type RangeHead struct {
head *Head
mint, maxt int64
}
func (h *rangeHead) Index() (IndexReader, error) {
// NewRangeHead returns a *rangeHead.
func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
return &RangeHead{
head: head,
mint: mint,
maxt: maxt,
}
}
func (h *RangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Chunks() (ChunkReader, error) {
func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Tombstones() (tombstones.Reader, error) {
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
return h.head.tombstones, nil
}
func (h *rangeHead) MinTime() int64 {
func (h *RangeHead) MinTime() int64 {
return h.mint
}
func (h *rangeHead) MaxTime() int64 {
func (h *RangeHead) MaxTime() int64 {
return h.maxt
}
func (h *rangeHead) NumSeries() uint64 {
func (h *RangeHead) NumSeries() uint64 {
return h.head.NumSeries()
}
func (h *rangeHead) Meta() BlockMeta {
func (h *RangeHead) Meta() BlockMeta {
return BlockMeta{
MinTime: h.MinTime(),
MaxTime: h.MaxTime(),