tsdb: turn off transaction isolation for head compaction (#11317)

* tsdb: add a basic test for read/write isolation

* tsdb: store the min time with isolationAppender
So that we can see when appending has moved past a certain point in time.

* tsdb: allow RangeHead to have isolation disabled
This will be used when for head compaction.

* tsdb: do head compaction with isolation disabled
This saves a lot of work tracking appends done while compaction is ongoing.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2022-09-27 15:01:23 +01:00 committed by GitHub
parent d0607435a2
commit ff00dee262
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 116 additions and 8 deletions

View file

@ -1059,7 +1059,15 @@ func (db *DB) Compact() (returnErr error) {
// so in order to make sure that overlaps are evaluated // so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value // consistently, we explicitly remove the last value
// from the block interval here. // from the block interval here.
if err := db.compactHead(NewRangeHead(db.head, mint, maxt-1)); err != nil { rh := NewRangeHeadWithIsolationDisabled(db.head, mint, maxt-1)
// Compaction runs with isolation disabled, because head.compactable()
// ensures that maxt is more than chunkRange/2 back from now, and
// head.appendableMinValidTime() ensures that no new appends can start within the compaction range.
// We do need to wait for any overlapping appenders that started previously to finish.
db.head.WaitForAppendersOverlapping(rh.MaxTime())
if err := db.compactHead(rh); err != nil {
return errors.Wrap(err, "compact head") return errors.Wrap(err, "compact head")
} }
// Consider only successful compactions for WAL truncation. // Consider only successful compactions for WAL truncation.

View file

@ -1030,6 +1030,13 @@ func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
} }
} }
// WaitForAppendersOverlapping waits for appends overlapping maxt to finish.
func (h *Head) WaitForAppendersOverlapping(maxt int64) {
for maxt >= h.iso.lowestAppendTime() {
time.Sleep(500 * time.Millisecond)
}
}
// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier // IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier
// has to be created. In the latter case, the method also returns the new mint to be used for creating the // has to be created. In the latter case, the method also returns the new mint to be used for creating the
// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data. // new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
@ -1235,6 +1242,8 @@ func (h *Head) Stats(statsByLabelName string) *Stats {
type RangeHead struct { type RangeHead struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64
isolationOff bool
} }
// NewRangeHead returns a *RangeHead. // NewRangeHead returns a *RangeHead.
@ -1247,12 +1256,23 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
} }
} }
// NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.
func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead {
rh := NewRangeHead(head, mint, maxt)
rh.isolationOff = true
return rh
}
func (h *RangeHead) Index() (IndexReader, error) { func (h *RangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil return h.head.indexRange(h.mint, h.maxt), nil
} }
func (h *RangeHead) Chunks() (ChunkReader, error) { func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt)) var isoState *isolationState
if !h.isolationOff {
isoState = h.head.iso.State(h.mint, h.maxt)
}
return h.head.chunksRange(h.mint, h.maxt, isoState)
} }
func (h *RangeHead) Tombstones() (tombstones.Reader, error) { func (h *RangeHead) Tombstones() (tombstones.Reader, error) {

View file

@ -124,7 +124,8 @@ func (h *Head) Appender(_ context.Context) storage.Appender {
} }
func (h *Head) appender() *headAppender { func (h *Head) appender() *headAppender {
appendID, cleanupAppendIDsBelow := h.iso.newAppendID() // Every appender gets an ID that is cleared upon commit/rollback. minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
// Allocate the exemplars buffer only if exemplars are enabled. // Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf []exemplarWithSeriesRef var exemplarsBuf []exemplarWithSeriesRef
@ -134,7 +135,7 @@ func (h *Head) appender() *headAppender {
return &headAppender{ return &headAppender{
head: h, head: h,
minValidTime: h.appendableMinValidTime(), minValidTime: minValidTime,
mint: math.MaxInt64, mint: math.MaxInt64,
maxt: math.MinInt64, maxt: math.MinInt64,
headMaxt: h.MaxTime(), headMaxt: h.MaxTime(),

View file

@ -262,7 +262,9 @@ type headChunkReader struct {
} }
func (h *headChunkReader) Close() error { func (h *headChunkReader) Close() error {
if h.isoState != nil {
h.isoState.Close() h.isoState.Close()
}
return nil return nil
} }

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"math"
"sync" "sync"
) )
@ -45,6 +46,7 @@ func (i *isolationState) IsolationDisabled() bool {
type isolationAppender struct { type isolationAppender struct {
appendID uint64 appendID uint64
minTime int64
prev *isolationAppender prev *isolationAppender
next *isolationAppender next *isolationAppender
} }
@ -116,6 +118,21 @@ func (i *isolation) lowWatermarkLocked() uint64 {
return i.appendsOpenList.next.appendID return i.appendsOpenList.next.appendID
} }
// lowestAppendTime returns the lowest minTime for any open appender,
// or math.MaxInt64 if no open appenders.
func (i *isolation) lowestAppendTime() int64 {
var lowest int64 = math.MaxInt64
i.appendMtx.RLock()
defer i.appendMtx.RUnlock()
for a := i.appendsOpenList.next; a != i.appendsOpenList; a = a.next {
if lowest > a.minTime {
lowest = a.minTime
}
}
return lowest
}
// State returns an object used to control isolation // State returns an object used to control isolation
// between a query and appends. Must be closed when complete. // between a query and appends. Must be closed when complete.
func (i *isolation) State(mint, maxt int64) *isolationState { func (i *isolation) State(mint, maxt int64) *isolationState {
@ -164,7 +181,7 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
// newAppendID increments the transaction counter and returns a new transaction // newAppendID increments the transaction counter and returns a new transaction
// ID. The first ID returned is 1. // ID. The first ID returned is 1.
// Also returns the low watermark, to keep lock/unlock operations down. // Also returns the low watermark, to keep lock/unlock operations down.
func (i *isolation) newAppendID() (uint64, uint64) { func (i *isolation) newAppendID(minTime int64) (uint64, uint64) {
if i.disabled { if i.disabled {
return 0, 0 return 0, 0
} }
@ -177,6 +194,7 @@ func (i *isolation) newAppendID() (uint64, uint64) {
app := i.appendersPool.Get().(*isolationAppender) app := i.appendersPool.Get().(*isolationAppender)
app.appendID = i.appendsOpenList.appendID app.appendID = i.appendsOpenList.appendID
app.minTime = minTime
app.prev = i.appendsOpenList.prev app.prev = i.appendsOpenList.prev
app.next = i.appendsOpenList app.next = i.appendsOpenList

View file

@ -18,8 +18,67 @@ import (
"strconv" "strconv"
"sync" "sync"
"testing" "testing"
"github.com/stretchr/testify/require"
) )
func TestIsolation(t *testing.T) {
type result struct {
id uint64
lowWatermark uint64
}
var appendA, appendB result
iso := newIsolation(false)
// Low watermark starts at 1.
require.Equal(t, uint64(0), iso.lowWatermark())
require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime())
// Pretend we are starting to append.
appendA.id, appendA.lowWatermark = iso.newAppendID(10)
require.Equal(t, result{1, 1}, appendA)
require.Equal(t, uint64(1), iso.lowWatermark())
require.Equal(t, 0, countOpenReads(iso))
require.Equal(t, int64(10), iso.lowestAppendTime())
// Now we start a read.
stateA := iso.State(10, 20)
require.Equal(t, 1, countOpenReads(iso))
// Second appender.
appendB.id, appendB.lowWatermark = iso.newAppendID(20)
require.Equal(t, result{2, 1}, appendB)
require.Equal(t, uint64(1), iso.lowWatermark())
require.Equal(t, int64(10), iso.lowestAppendTime())
iso.closeAppend(appendA.id)
// Low watermark remains at 1 because stateA is still open
require.Equal(t, uint64(1), iso.lowWatermark())
require.Equal(t, 1, countOpenReads(iso))
require.Equal(t, int64(20), iso.lowestAppendTime())
// Finish the read and low watermark should rise.
stateA.Close()
require.Equal(t, uint64(2), iso.lowWatermark())
require.Equal(t, 0, countOpenReads(iso))
iso.closeAppend(appendB.id)
require.Equal(t, uint64(2), iso.lowWatermark())
require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime())
}
func countOpenReads(iso *isolation) int {
count := 0
iso.TraverseOpenReads(func(s *isolationState) bool {
count++
return true
})
return count
}
func BenchmarkIsolation(b *testing.B) { func BenchmarkIsolation(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} { for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) { b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
@ -36,7 +95,7 @@ func BenchmarkIsolation(b *testing.B) {
<-start <-start
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
appendID, _ := iso.newAppendID() appendID, _ := iso.newAppendID(0)
iso.closeAppend(appendID) iso.closeAppend(appendID)
} }
@ -66,7 +125,7 @@ func BenchmarkIsolationWithState(b *testing.B) {
<-start <-start
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
appendID, _ := iso.newAppendID() appendID, _ := iso.newAppendID(0)
iso.closeAppend(appendID) iso.closeAppend(appendID)
} }