mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
no overlapping on compaction when an existing block is not within default boundaries. (#461)
closes https://github.com/prometheus/prometheus/issues/4643 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
parent
83a4ef1382
commit
bac9cbed2e
|
@ -3,3 +3,5 @@
|
|||
- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
|
||||
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
|
||||
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.
|
||||
- `Head.Init()` is changed to `Head.Init(minValidTime int64)` where `minValidTime` is taken from the maxt of the last persisted block and any samples below `minValidTime` will not be loaded from the wal in the head. The same value is used when using the `Heah.Appender()` to disallow adding samples below the `minValidTime` timestamp. This change was nececary to fix a bug where a `Snapshot()` with the head included would create a block with custom time range(not bound to the default time ranges) and the next block population from the head would create an overlapping block.
|
||||
- https://github.com/prometheus/tsdb/issues/446
|
|
@ -77,8 +77,9 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
|
|||
return b
|
||||
}
|
||||
|
||||
// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
|
||||
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
|
||||
// createPopulatedBlock creates a block with nSeries series, filled with
|
||||
// samples of the given mint,maxt time range.
|
||||
func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block {
|
||||
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
|
||||
testutil.Ok(tb, err)
|
||||
defer head.Close()
|
||||
|
@ -87,12 +88,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
|
|||
testutil.Ok(tb, err)
|
||||
refs := make([]uint64, nSeries)
|
||||
|
||||
for n := 0; n < nSamples; n++ {
|
||||
for ts := mint; ts <= maxt; ts++ {
|
||||
app := head.Appender()
|
||||
ts := n * 1000
|
||||
for i, lbl := range lbls {
|
||||
if refs[i] != 0 {
|
||||
err := app.AddFast(refs[i], int64(ts), rand.Float64())
|
||||
err := app.AddFast(refs[i], ts, rand.Float64())
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -733,7 +733,7 @@ func TestDisableAutoCompactions(t *testing.T) {
|
|||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
for x := 0; x < 10; x++ {
|
||||
for x := 0; x < 20; x++ {
|
||||
if len(db.Blocks()) > 0 {
|
||||
break
|
||||
}
|
||||
|
|
23
db.go
23
db.go
|
@ -271,12 +271,21 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := db.head.Init(); err != nil {
|
||||
return nil, errors.Wrap(err, "read WAL")
|
||||
}
|
||||
|
||||
if err := db.reload(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Set the min valid time for the ingested samples
|
||||
// to be no lower than the maxt of the last block.
|
||||
blocks := db.Blocks()
|
||||
minValidTime := int64(math.MinInt64)
|
||||
if len(blocks) > 0 {
|
||||
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
|
||||
}
|
||||
|
||||
if err := db.head.Init(minValidTime); err != nil {
|
||||
return nil, errors.Wrap(err, "read WAL")
|
||||
}
|
||||
|
||||
go db.run()
|
||||
|
||||
|
@ -395,7 +404,8 @@ func (db *DB) compact() (err error) {
|
|||
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
|
||||
break
|
||||
}
|
||||
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
|
||||
mint := db.head.MinTime()
|
||||
maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0])
|
||||
|
||||
// Wrap head into a range that bounds all reads to it.
|
||||
head := &rangeHead{
|
||||
|
@ -826,9 +836,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
|
|||
return sq, nil
|
||||
}
|
||||
|
||||
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||
mint = (t / width) * width
|
||||
return mint, mint + width
|
||||
func rangeForTimestamp(t int64, width int64) (maxt int64) {
|
||||
return (t/width)*width + width
|
||||
}
|
||||
|
||||
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
|
||||
|
|
105
db_test.go
105
db_test.go
|
@ -25,6 +25,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -1199,6 +1200,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
|
|||
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
|
||||
}
|
||||
|
||||
// TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
|
||||
// - no blocks no WAL: set to the time of the first appended sample
|
||||
// - no blocks with WAL: set to the smallest sample from the WAL
|
||||
// - with blocks no WAL: set to the last block maxT
|
||||
// - with blocks with WAL: same as above
|
||||
func TestInitializeHeadTimestamp(t *testing.T) {
|
||||
t.Run("clean", func(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "test_head_init")
|
||||
|
@ -1441,3 +1447,102 @@ func TestCorrectNumTombstones(t *testing.T) {
|
|||
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
|
||||
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
|
||||
}
|
||||
|
||||
// TestBlockRanges checks the following use cases:
|
||||
// - No samples can be added with timestamps lower than the last block maxt.
|
||||
// - The compactor doesn't create overlaping blocks
|
||||
// even when the last blocks is not within the default boundaries.
|
||||
// - Lower bondary is based on the smallest sample in the head and
|
||||
// upper boundary is rounded to the configured block range.
|
||||
//
|
||||
// This ensures that a snapshot that includes the head and creates a block with a custom time range
|
||||
// will not overlap with the first block created by the next compaction.
|
||||
func TestBlockRanges(t *testing.T) {
|
||||
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
||||
|
||||
dir, err := ioutil.TempDir("", "test_storage")
|
||||
if err != nil {
|
||||
t.Fatalf("Opening test dir failed: %s", err)
|
||||
}
|
||||
|
||||
rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1
|
||||
|
||||
// Test that the compactor doesn't create overlapping blocks
|
||||
// when a non standard block already exists.
|
||||
firstBlockMaxT := int64(3)
|
||||
createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT)
|
||||
db, err := Open(dir, logger, nil, DefaultOptions)
|
||||
if err != nil {
|
||||
t.Fatalf("Opening test storage failed: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
os.RemoveAll(dir)
|
||||
}()
|
||||
app := db.Appender()
|
||||
lbl := labels.Labels{{"a", "b"}}
|
||||
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
|
||||
if err == nil {
|
||||
t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
|
||||
}
|
||||
_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
|
||||
_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction
|
||||
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
for x := 1; x < 10; x++ {
|
||||
if len(db.Blocks()) == 2 {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")
|
||||
|
||||
if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
|
||||
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
|
||||
}
|
||||
|
||||
// Test that wal records are skipped when an existing block covers the same time ranges
|
||||
// and compaction doesn't create an overlapping block.
|
||||
db.DisableCompactions()
|
||||
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
|
||||
testutil.Ok(t, app.Commit())
|
||||
testutil.Ok(t, db.Close())
|
||||
|
||||
thirdBlockMaxt := secondBlockMaxt + 2
|
||||
createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt)
|
||||
|
||||
db, err = Open(dir, logger, nil, DefaultOptions)
|
||||
if err != nil {
|
||||
t.Fatalf("Opening test storage failed: %s", err)
|
||||
}
|
||||
defer db.Close()
|
||||
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
|
||||
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
|
||||
|
||||
app = db.Appender()
|
||||
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, app.Commit())
|
||||
for x := 1; x < 10; x++ {
|
||||
if len(db.Blocks()) == 4 {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")
|
||||
|
||||
if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
|
||||
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
|
||||
}
|
||||
}
|
||||
|
|
35
head.go
35
head.go
|
@ -59,7 +59,8 @@ type Head struct {
|
|||
appendPool sync.Pool
|
||||
bytesPool sync.Pool
|
||||
|
||||
minTime, maxTime int64
|
||||
minTime, maxTime int64 // Current min and max of the samples included in the head.
|
||||
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||||
lastSeriesID uint64
|
||||
|
||||
// All series addressable by their ID or hash.
|
||||
|
@ -300,13 +301,6 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
|
|||
}
|
||||
|
||||
func (h *Head) loadWAL(r *wal.Reader) error {
|
||||
minValidTime := h.MinTime()
|
||||
// If the min time is still uninitialized (no persisted blocks yet),
|
||||
// we accept all sample timestamps from the WAL.
|
||||
if minValidTime == math.MaxInt64 {
|
||||
minValidTime = math.MinInt64
|
||||
}
|
||||
|
||||
// Track number of samples that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs uint64
|
||||
|
@ -327,7 +321,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
|
|||
inputs[i] = make(chan []RefSample, 300)
|
||||
|
||||
go func(input <-chan []RefSample, output chan<- []RefSample) {
|
||||
unknown := h.processWALSamples(minValidTime, input, output)
|
||||
unknown := h.processWALSamples(h.minValidTime, input, output)
|
||||
atomic.AddUint64(&unknownRefs, unknown)
|
||||
wg.Done()
|
||||
}(inputs[i], outputs[i])
|
||||
|
@ -410,7 +404,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
|
|||
}
|
||||
for _, s := range tstones {
|
||||
for _, itv := range s.intervals {
|
||||
if itv.Maxt < minValidTime {
|
||||
if itv.Maxt < h.minValidTime {
|
||||
continue
|
||||
}
|
||||
h.tombstones.addInterval(s.ref, itv)
|
||||
|
@ -443,8 +437,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
|
|||
}
|
||||
|
||||
// Init loads data from the write ahead log and prepares the head for writes.
|
||||
func (h *Head) Init() error {
|
||||
// It should be called before using an appender so that
|
||||
// limits the ingested samples to the head min valid time.
|
||||
func (h *Head) Init(minValidTime int64) error {
|
||||
h.minValidTime = minValidTime
|
||||
defer h.postings.EnsureOrder()
|
||||
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
||||
|
||||
if h.wal == nil {
|
||||
return nil
|
||||
|
@ -486,6 +484,7 @@ func (h *Head) Init() error {
|
|||
if err := h.wal.Repair(err); err != nil {
|
||||
return errors.Wrap(err, "repair corrupted WAL")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -502,6 +501,7 @@ func (h *Head) Truncate(mint int64) (err error) {
|
|||
return nil
|
||||
}
|
||||
atomic.StoreInt64(&h.minTime, mint)
|
||||
h.minValidTime = mint
|
||||
|
||||
// Ensure that max time is at least as high as min time.
|
||||
for h.MaxTime() < mint {
|
||||
|
@ -655,13 +655,22 @@ func (h *Head) Appender() Appender {
|
|||
func (h *Head) appender() *headAppender {
|
||||
return &headAppender{
|
||||
head: h,
|
||||
minValidTime: h.MaxTime() - h.chunkRange/2,
|
||||
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
|
||||
// This ensures that no samples will be added within the compaction window to avoid races.
|
||||
minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2),
|
||||
mint: math.MaxInt64,
|
||||
maxt: math.MinInt64,
|
||||
samples: h.getAppendBuffer(),
|
||||
}
|
||||
}
|
||||
|
||||
func max(a, b int64) int64 {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) getAppendBuffer() []RefSample {
|
||||
b := h.appendPool.Get()
|
||||
if b == nil {
|
||||
|
@ -1411,7 +1420,7 @@ func (s *memSeries) cut(mint int64) *memChunk {
|
|||
|
||||
// Set upper bound on when the next chunk must be started. An earlier timestamp
|
||||
// may be chosen dynamically at a later point.
|
||||
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
|
||||
s.nextAt = rangeForTimestamp(mint, s.chunkRange)
|
||||
|
||||
app, err := c.chunk.Appender()
|
||||
if err != nil {
|
||||
|
|
10
head_test.go
10
head_test.go
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -123,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer head.Close()
|
||||
|
||||
testutil.Ok(t, head.Init())
|
||||
testutil.Ok(t, head.Init(math.MinInt64))
|
||||
testutil.Equals(t, uint64(100), head.lastSeriesID)
|
||||
|
||||
s10 := head.series.getByID(10)
|
||||
|
@ -132,7 +133,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
|||
s100 := head.series.getByID(100)
|
||||
|
||||
testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
|
||||
testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset)
|
||||
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
|
||||
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
|
||||
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
|
||||
|
||||
|
@ -146,7 +147,6 @@ func TestHead_ReadWAL(t *testing.T) {
|
|||
}
|
||||
|
||||
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
|
||||
testutil.Equals(t, 0, len(s11.chunks))
|
||||
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
|
||||
testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
|
||||
}
|
||||
|
@ -288,7 +288,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer head.Close()
|
||||
|
||||
testutil.Ok(t, head.Init())
|
||||
testutil.Ok(t, head.Init(math.MinInt64))
|
||||
|
||||
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
|
||||
}
|
||||
|
@ -923,7 +923,7 @@ func TestWalRepair(t *testing.T) {
|
|||
|
||||
h, err := NewHead(nil, nil, w, 1)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, h.Init())
|
||||
testutil.Ok(t, h.Init(math.MinInt64))
|
||||
|
||||
sr, err := wal.NewSegmentsReader(dir)
|
||||
testutil.Ok(t, err)
|
||||
|
|
|
@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
|
|||
dir, err := ioutil.TempDir("", "bench_persisted")
|
||||
testutil.Ok(b, err)
|
||||
defer os.RemoveAll(dir)
|
||||
block := createPopulatedBlock(b, dir, nSeries, nSamples)
|
||||
block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples))
|
||||
defer block.Close()
|
||||
|
||||
q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime)
|
||||
|
|
Loading…
Reference in a new issue