Merge branch 'master' of https://github.com/prometheus/tsdb into tsdb-delete

This commit is contained in:
codwu 2018-07-06 20:21:32 +08:00
commit 667e539a7a
10 changed files with 299 additions and 139 deletions

View file

@ -164,6 +164,13 @@ type BlockStats struct {
NumTombstones uint64 `json:"numTombstones,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"`
} }
// BlockDesc describes a block by ULID and time range.
type BlockDesc struct {
ULID ulid.ULID `json:"ulid"`
MinTime int64 `json:"minTime"`
MaxTime int64 `json:"maxTime"`
}
// BlockMetaCompaction holds information about compactions a block went through. // BlockMetaCompaction holds information about compactions a block went through.
type BlockMetaCompaction struct { type BlockMetaCompaction struct {
// Maximum number of compaction cycles any source block has // Maximum number of compaction cycles any source block has
@ -171,6 +178,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"` Level int `json:"level"`
// ULIDs of all source head blocks that went into the block. // ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"` Sources []ulid.ULID `json:"sources,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
Failed bool `json:"failed,omitempty"` Failed bool `json:"failed,omitempty"`
} }
@ -437,7 +447,7 @@ Outer:
} }
for _, chk := range chks { for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones.addInterval(p.At(), Interval{tmin, tmax}) stones.addInterval(p.At(), Interval{tmin, tmax})
@ -475,19 +485,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
pb.tombstones.Iter(func(id uint64, ivs Intervals) error { pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
numStones += len(ivs) numStones += len(ivs)
return nil return nil
}) })
if numStones == 0 { if numStones == 0 {
return nil, nil return nil, nil
} }
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime) meta := pb.Meta()
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &uid, nil return &uid, nil
} }
@ -531,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
return nil return nil
} }
// Returns true if the block overlaps [mint, maxt].
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}
func clampInterval(a, b, mint, maxt int64) (int64, int64) { func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint { if a < mint {
a = mint a = mint

View file

@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
return nil return nil
} }
// Returns true if the chunk overlaps [mint, maxt].
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm.MinTime <= maxt && mint <= cm.MaxTime
}
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")
@ -296,7 +302,7 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
} }
// Verify magic number. // Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
} }
return &cr, nil return &cr, nil
@ -357,8 +363,8 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
r := b.Range(off, off+binary.MaxVarintLen32) r := b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(r) l, n := binary.Uvarint(r)
if n < 0 { if n <= 0 {
return nil, fmt.Errorf("reading chunk length failed") return nil, errors.Errorf("reading chunk length failed with %d", n)
} }
r = b.Range(off+n, off+n+int(l)) r = b.Range(off+n, off+n+int(l))

28
chunks/chunks_test.go Normal file
View file

@ -0,0 +1,28 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"testing"
"github.com/prometheus/tsdb/testutil"
)
func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
r := &Reader{bs: []ByteSlice{b}}
_, err := r.Chunk(0)
testutil.NotOk(t, err)
}

View file

@ -55,7 +55,7 @@ type Compactor interface {
Plan(dir string) ([]string, error) Plan(dir string) ([]string, error)
// Write persists a Block into a directory. // Write persists a Block into a directory.
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
for _, s := range b.Compaction.Sources { for _, s := range b.Compaction.Sources {
sources[s] = struct{}{} sources[s] = struct{}{}
} }
res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
ULID: b.ULID,
MinTime: b.MinTime,
MaxTime: b.MaxTime,
})
} }
res.Compaction.Level++ res.Compaction.Level++
@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
return uid, merr return uid, merr
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
meta.Compaction.Level = 1 meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid} meta.Compaction.Sources = []ulid.ULID{uid}
if parent != nil {
meta.Compaction.Parents = []BlockDesc{
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
}
}
err := c.write(dest, meta, b) err := c.write(dest, meta, b)
if err != nil { if err != nil {
return uid, err return uid, err
@ -581,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 { if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values. // Re-encode the chunk to not have deleted values.
for i, chk := range chks { for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue continue
} }

190
db.go
View file

@ -75,7 +75,7 @@ type Appender interface {
// Returned reference numbers are ephemeral and may be rejected in calls // Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new // to AddFast() at any point. Adding the sample via Add() returns a new
// reference number. // reference number.
// If the reference is the empty string it must not be used for caching. // If the reference is 0 it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (uint64, error) Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. It is generally faster // Add adds a sample pair for the referenced series. It is generally faster
@ -267,17 +267,9 @@ func (db *DB) run() {
case <-db.compactc: case <-db.compactc:
db.metrics.compactionsTriggered.Inc() db.metrics.compactionsTriggered.Inc()
_, err1 := db.retentionCutoff() _, err := db.compact()
if err1 != nil { if err != nil {
level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1) level.Error(db.logger).Log("msg", "compaction failed", "err", err)
}
_, err2 := db.compact()
if err2 != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err2)
}
if err1 != nil || err2 != nil {
backoff = exponential(backoff, 1*time.Second, 1*time.Minute) backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else { } else {
backoff = 0 backoff = 0
@ -289,19 +281,9 @@ func (db *DB) run() {
} }
} }
func (db *DB) retentionCutoff() (b bool, err error) { func (db *DB) beyondRetention(meta *BlockMeta) bool {
defer func() {
if !b && err == nil {
// no data had to be cut off.
return
}
db.metrics.cutoffs.Inc()
if err != nil {
db.metrics.cutoffsFailed.Inc()
}
}()
if db.opts.RetentionDuration == 0 { if db.opts.RetentionDuration == 0 {
return false, nil return false
} }
db.mtx.RLock() db.mtx.RLock()
@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) {
db.mtx.RUnlock() db.mtx.RUnlock()
if len(blocks) == 0 { if len(blocks) == 0 {
return false, nil return false
} }
last := blocks[len(db.blocks)-1] last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
// This will close the dirs and then delete the dirs. return meta.MaxTime < mint
if len(dirs) > 0 {
return true, db.reload(dirs...)
}
return false, nil
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
@ -354,6 +326,13 @@ func (a dbAppender) Commit() error {
return err return err
} }
// Compact data if possible. After successful compaction blocks are reloaded
// which will also trigger blocks to be deleted that fall out of the retention
// window.
// If no blocks are compacted, the retention window state doesn't change. Thus,
// this is sufficient to reliably delete old data.
// Old blocks are only deleted on reload based on the new block's parent information.
// See DB.reload documentation for further information.
func (db *DB) compact() (changes bool, err error) { func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
@ -381,9 +360,15 @@ func (db *DB) compact() (changes bool, err error) {
head := &rangeHead{ head := &rangeHead{
head: db.head, head: db.head,
mint: mint, mint: mint,
maxt: maxt, // We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
maxt: maxt - 1,
} }
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block") return changes, errors.Wrap(err, "persist head block")
} }
changes = true changes = true
@ -418,7 +403,7 @@ func (db *DB) compact() (changes bool, err error) {
changes = true changes = true
runtime.GC() runtime.GC()
if err := db.reload(plan...); err != nil { if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks") return changes, errors.Wrap(err, "reload blocks")
} }
runtime.GC() runtime.GC()
@ -427,39 +412,6 @@ func (db *DB) compact() (changes bool, err error) {
return changes, nil return changes, nil
} }
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
// before mint.
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
df, err := fileutil.OpenDir(dir)
if err != nil {
return nil, errors.Wrapf(err, "open directory")
}
defer df.Close()
dirs, err := blockDirs(dir)
if err != nil {
return nil, errors.Wrapf(err, "list block dirs %s", dir)
}
delDirs := []string{}
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return nil, errors.Wrapf(err, "read block meta %s", dir)
}
// The first block we encounter marks that we crossed the boundary
// of deletable blocks.
if meta.MaxTime >= mint {
break
}
delDirs = append(delDirs, dir)
}
return delDirs, nil
}
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks { for _, b := range db.blocks {
if b.Meta().ULID == id { if b.Meta().ULID == id {
@ -469,18 +421,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
return nil, false return nil, false
} }
func stringsContain(set []string, elem string) bool {
for _, e := range set {
if elem == e {
return true
}
}
return false
}
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes // reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
// a list of block directories which should be deleted during reload. // a list of block directories which should be deleted during reload.
func (db *DB) reload(deleteable ...string) (err error) { // Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
db.metrics.reloadsFailed.Inc() db.metrics.reloadsFailed.Inc()
@ -492,21 +436,58 @@ func (db *DB) reload(deleteable ...string) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "find blocks") return errors.Wrap(err, "find blocks")
} }
// We delete old blocks that have been superseded by new ones by gathering all parents
// from existing blocks. Those parents all have newer replacements and can be safely deleted
// after we loaded the other blocks.
// This makes us resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
// blocks with their parents, we can pick up the deletion where it left off during a crash.
var ( var (
blocks []*Block blocks []*Block
exist = map[ulid.ULID]struct{}{} corrupted = map[ulid.ULID]error{}
opened = map[ulid.ULID]struct{}{}
deleteable = map[ulid.ULID]struct{}{}
) )
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
// The block was potentially in the middle of being deleted during a crash.
// Skip it since we may delete it properly further down again.
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
ulid, err2 := ulid.Parse(filepath.Base(dir))
if err2 != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
corrupted[ulid] = err
continue
}
if db.beyondRetention(meta) {
deleteable[meta.ULID] = struct{}{}
continue
}
for _, b := range meta.Compaction.Parents {
deleteable[b.ULID] = struct{}{}
}
}
// Blocks we failed to open should all be those we are want to delete anyway.
for c, err := range corrupted {
if _, ok := deleteable[c]; !ok {
return errors.Wrapf(err, "unexpected corrupted block %s", c)
}
}
// Load new blocks into memory.
for _, dir := range dirs { for _, dir := range dirs {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return errors.Wrapf(err, "read meta information %s", dir) return errors.Wrapf(err, "read meta information %s", dir)
} }
// If the block is pending for deletion, don't add it to the new block set. // Don't load blocks that are scheduled for deletion.
if stringsContain(deleteable, dir) { if _, ok := deleteable[meta.ULID]; ok {
continue continue
} }
// See if we already have the block in memory or open it otherwise.
b, ok := db.getBlock(meta.ULID) b, ok := db.getBlock(meta.ULID)
if !ok { if !ok {
b, err = OpenBlock(dir, db.chunkPool) b, err = OpenBlock(dir, db.chunkPool)
@ -514,9 +495,8 @@ func (db *DB) reload(deleteable ...string) (err error) {
return errors.Wrapf(err, "open block %s", dir) return errors.Wrapf(err, "open block %s", dir)
} }
} }
blocks = append(blocks, b) blocks = append(blocks, b)
exist[meta.ULID] = struct{}{} opened[meta.ULID] = struct{}{}
} }
sort.Slice(blocks, func(i, j int) bool { sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
@ -533,15 +513,19 @@ func (db *DB) reload(deleteable ...string) (err error) {
db.blocks = blocks db.blocks = blocks
db.mtx.Unlock() db.mtx.Unlock()
// Drop old blocks from memory.
for _, b := range oldBlocks { for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; ok { if _, ok := opened[b.Meta().ULID]; ok {
continue continue
} }
if err := b.Close(); err != nil { if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err) level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
} }
if err := os.RemoveAll(b.Dir()); err != nil { }
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err) // Delete all obsolete blocks. None of them are opened any longer.
for ulid := range deleteable {
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
} }
} }
@ -765,7 +749,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
if !withHead { if !withHead {
return nil return nil
} }
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
return errors.Wrap(err, "snapshot head block") return errors.Wrap(err, "snapshot head block")
} }
@ -778,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
for _, b := range db.blocks { for _, b := range db.blocks {
m := b.Meta() if b.OverlapsClosedInterval(mint, maxt) {
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b) blocks = append(blocks, b)
} }
} }
@ -821,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
defer db.mtx.RUnlock() defer db.mtx.RUnlock()
for _, b := range db.blocks { for _, b := range db.blocks {
m := b.Meta() if b.OverlapsClosedInterval(mint, maxt) {
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b *Block) func() error { g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) } return func() error { return b.Delete(mint, maxt, ms...) }
}(b)) }(b))
@ -859,27 +841,15 @@ func (db *DB) CleanTombstones() (err error) {
blocks := db.blocks[:] blocks := db.blocks[:]
db.mtx.RUnlock() db.mtx.RUnlock()
deletable := []string{}
for _, b := range blocks { for _, b := range blocks {
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
return err return err
} else if uid != nil { // New block was created. } else if uid != nil { // New block was created.
deletable = append(deletable, b.Dir())
newUIDs = append(newUIDs, *uid) newUIDs = append(newUIDs, *uid)
} }
} }
return errors.Wrap(db.reload(), "reload blocks")
if len(deletable) == 0 {
return nil
}
return errors.Wrap(db.reload(deletable...), "reload blocks")
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
} }
func isBlockDir(fi os.FileInfo) bool { func isBlockDir(fi os.FileInfo) bool {

View file

@ -26,6 +26,8 @@ import (
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -842,7 +844,7 @@ type mockCompactorFailing struct {
func (*mockCompactorFailing) Plan(dir string) ([]string, error) { func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
return nil, nil return nil, nil
} }
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
if len(c.blocks) >= c.max { if len(c.blocks) >= c.max {
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
} }
@ -925,10 +927,8 @@ func TestDB_Retention(t *testing.T) {
testutil.Equals(t, 2, len(db.blocks)) testutil.Equals(t, 2, len(db.blocks))
// Now call retention. // Reload blocks, which should drop blocks beyond the retention boundary.
changes, err := db.retentionCutoff() testutil.Ok(t, db.reload())
testutil.Ok(t, err)
testutil.Assert(t, changes, "there should be changes")
testutil.Equals(t, 1, len(db.blocks)) testutil.Equals(t, 1, len(db.blocks))
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
} }
@ -1098,3 +1098,90 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9 {Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9
}, OverlappingBlocks(nc1)) }, OverlappingBlocks(nc1))
} }
// Regression test for https://github.com/prometheus/tsdb/issues/347
func TestChunkAtBlockBoundary(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
app := db.Appender()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}
err := app.Commit()
testutil.Ok(t, err)
_, err = db.compact()
testutil.Ok(t, err)
for _, block := range db.blocks {
r, err := block.Index()
testutil.Ok(t, err)
defer r.Close()
meta := block.Meta()
p, err := r.Postings(index.AllPostingsKey())
testutil.Ok(t, err)
var (
lset labels.Labels
chks []chunks.Meta
)
chunkCount := 0
for p.Next() {
err = r.Series(p.At(), &lset, &chks)
testutil.Ok(t, err)
for _, c := range chks {
testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
"chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
chunkCount++
}
}
testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
}
}
func TestQuerierWithBoundaryChunks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
app := db.Appender()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
for i := int64(0); i < 5; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}
err := app.Commit()
testutil.Ok(t, err)
_, err = db.compact()
testutil.Ok(t, err)
testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")
q, err := db.Querier(blockRange, 2*blockRange)
testutil.Ok(t, err)
defer q.Close()
// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}

25
head.go
View file

@ -521,7 +521,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
} }
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
defer a.Rollback() defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
if err := a.head.wal.LogSeries(a.series); err != nil { if err := a.head.wal.LogSeries(a.series); err != nil {
return err return err
@ -565,7 +566,9 @@ func (a *headAppender) Rollback() error {
a.head.metrics.activeAppenders.Dec() a.head.metrics.activeAppenders.Dec()
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
return nil // Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a.head.wal.LogSeries(a.series)
} }
// Delete all samples in the range of [mint, maxt] for series that satisfy the given // Delete all samples in the range of [mint, maxt] for series that satisfy the given
@ -732,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Lock() s.Lock()
c := s.chunk(int(cid)) c := s.chunk(int(cid))
// This means that the chunk has been garbage collected. // This means that the chunk has been garbage collected or is outside
if c == nil { // the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock() s.Unlock()
return nil, ErrNotFound return nil, ErrNotFound
} }
mint, maxt := c.minTime, c.maxTime
s.Unlock() s.Unlock()
// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{ return &safeChunk{
Chunk: c.chunk, Chunk: c.chunk,
s: s, s: s,
@ -849,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
for i, c := range s.chunks { for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range. // Do not expose chunks that are outside of the specified range.
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue continue
} }
*chks = append(*chks, chunks.Meta{ *chks = append(*chks, chunks.Meta{
@ -1288,6 +1286,11 @@ type memChunk struct {
minTime, maxTime int64 minTime, maxTime int64
} }
// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type memSafeIterator struct { type memSafeIterator struct {
chunkenc.Iterator chunkenc.Iterator

View file

@ -47,6 +47,21 @@ type memoryWAL struct {
entries []interface{} entries []interface{}
} }
func (w *memoryWAL) LogSeries(s []RefSeries) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) LogSamples(s []RefSample) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) LogDeletes(s []Stone) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) Reader() WALReader { func (w *memoryWAL) Reader() WALReader {
return w return w
} }
@ -769,3 +784,20 @@ func TestGCSeriesAccess(t *testing.T) {
_, err = cr.Chunk(chunks[1].Ref) _, err = cr.Chunk(chunks[1].Ref)
testutil.Equals(t, ErrNotFound, err) testutil.Equals(t, ErrNotFound, err)
} }
func TestHead_LogRollback(t *testing.T) {
w := &memoryWAL{}
h, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
app := h.Appender()
_, err = app.Add(labels.FromStrings("a", "b"), 1, 2)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, 1, len(w.entries))
series, ok := w.entries[0].([]RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0])
testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}})
}

View file

@ -740,8 +740,8 @@ func (r *Reader) decbufUvarintAt(off int) decbuf {
b := r.b.Range(off, off+binary.MaxVarintLen32) b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
if n > binary.MaxVarintLen32 { if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.New("invalid uvarint")} return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
} }
if r.b.Len() < off+n+int(l)+4 { if r.b.Len() < off+n+int(l)+4 {

View file

@ -380,3 +380,11 @@ func TestPersistence_index_e2e(t *testing.T) {
testutil.Ok(t, ir.Close()) testutil.Ok(t, ir.Close())
} }
func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
r := &Reader{b: b}
db := r.decbufUvarintAt(0)
testutil.NotOk(t, db.err())
}