diff --git a/tsdb/db.go b/tsdb/db.go index 7cf70bcc2..6444b3a12 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1059,7 +1059,15 @@ func (db *DB) Compact() (returnErr error) { // so in order to make sure that overlaps are evaluated // consistently, we explicitly remove the last value // from the block interval here. - if err := db.compactHead(NewRangeHead(db.head, mint, maxt-1)); err != nil { + rh := NewRangeHeadWithIsolationDisabled(db.head, mint, maxt-1) + + // Compaction runs with isolation disabled, because head.compactable() + // ensures that maxt is more than chunkRange/2 back from now, and + // head.appendableMinValidTime() ensures that no new appends can start within the compaction range. + // We do need to wait for any overlapping appenders that started previously to finish. + db.head.WaitForAppendersOverlapping(rh.MaxTime()) + + if err := db.compactHead(rh); err != nil { return errors.Wrap(err, "compact head") } // Consider only successful compactions for WAL truncation. diff --git a/tsdb/head.go b/tsdb/head.go index 79232a0a8..90cfacf79 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1030,6 +1030,13 @@ func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) { } } +// WaitForAppendersOverlapping waits for appends overlapping maxt to finish. +func (h *Head) WaitForAppendersOverlapping(maxt int64) { + for maxt >= h.iso.lowestAppendTime() { + time.Sleep(500 * time.Millisecond) + } +} + // IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier // has to be created. In the latter case, the method also returns the new mint to be used for creating the // new range head and the new querier. This methods helps preventing races with the truncation of in-memory data. @@ -1235,6 +1242,8 @@ func (h *Head) Stats(statsByLabelName string) *Stats { type RangeHead struct { head *Head mint, maxt int64 + + isolationOff bool } // NewRangeHead returns a *RangeHead. @@ -1247,12 +1256,23 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { } } +// NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState. +func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead { + rh := NewRangeHead(head, mint, maxt) + rh.isolationOff = true + return rh +} + func (h *RangeHead) Index() (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil } func (h *RangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt)) + var isoState *isolationState + if !h.isolationOff { + isoState = h.head.iso.State(h.mint, h.maxt) + } + return h.head.chunksRange(h.mint, h.maxt, isoState) } func (h *RangeHead) Tombstones() (tombstones.Reader, error) { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 48682ea28..067281cc4 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -124,7 +124,8 @@ func (h *Head) Appender(_ context.Context) storage.Appender { } func (h *Head) appender() *headAppender { - appendID, cleanupAppendIDsBelow := h.iso.newAppendID() // Every appender gets an ID that is cleared upon commit/rollback. + minValidTime := h.appendableMinValidTime() + appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback. // Allocate the exemplars buffer only if exemplars are enabled. var exemplarsBuf []exemplarWithSeriesRef @@ -134,7 +135,7 @@ func (h *Head) appender() *headAppender { return &headAppender{ head: h, - minValidTime: h.appendableMinValidTime(), + minValidTime: minValidTime, mint: math.MaxInt64, maxt: math.MinInt64, headMaxt: h.MaxTime(), diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 5b2a70c03..0fe24792a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -262,7 +262,9 @@ type headChunkReader struct { } func (h *headChunkReader) Close() error { - h.isoState.Close() + if h.isoState != nil { + h.isoState.Close() + } return nil } diff --git a/tsdb/isolation.go b/tsdb/isolation.go index 46d13edf8..74d63c6af 100644 --- a/tsdb/isolation.go +++ b/tsdb/isolation.go @@ -14,6 +14,7 @@ package tsdb import ( + "math" "sync" ) @@ -45,6 +46,7 @@ func (i *isolationState) IsolationDisabled() bool { type isolationAppender struct { appendID uint64 + minTime int64 prev *isolationAppender next *isolationAppender } @@ -116,6 +118,21 @@ func (i *isolation) lowWatermarkLocked() uint64 { return i.appendsOpenList.next.appendID } +// lowestAppendTime returns the lowest minTime for any open appender, +// or math.MaxInt64 if no open appenders. +func (i *isolation) lowestAppendTime() int64 { + var lowest int64 = math.MaxInt64 + i.appendMtx.RLock() + defer i.appendMtx.RUnlock() + + for a := i.appendsOpenList.next; a != i.appendsOpenList; a = a.next { + if lowest > a.minTime { + lowest = a.minTime + } + } + return lowest +} + // State returns an object used to control isolation // between a query and appends. Must be closed when complete. func (i *isolation) State(mint, maxt int64) *isolationState { @@ -164,7 +181,7 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) { // newAppendID increments the transaction counter and returns a new transaction // ID. The first ID returned is 1. // Also returns the low watermark, to keep lock/unlock operations down. -func (i *isolation) newAppendID() (uint64, uint64) { +func (i *isolation) newAppendID(minTime int64) (uint64, uint64) { if i.disabled { return 0, 0 } @@ -177,6 +194,7 @@ func (i *isolation) newAppendID() (uint64, uint64) { app := i.appendersPool.Get().(*isolationAppender) app.appendID = i.appendsOpenList.appendID + app.minTime = minTime app.prev = i.appendsOpenList.prev app.next = i.appendsOpenList diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go index 6e27e06fc..36083a102 100644 --- a/tsdb/isolation_test.go +++ b/tsdb/isolation_test.go @@ -18,8 +18,67 @@ import ( "strconv" "sync" "testing" + + "github.com/stretchr/testify/require" ) +func TestIsolation(t *testing.T) { + type result struct { + id uint64 + lowWatermark uint64 + } + var appendA, appendB result + iso := newIsolation(false) + + // Low watermark starts at 1. + require.Equal(t, uint64(0), iso.lowWatermark()) + require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime()) + + // Pretend we are starting to append. + appendA.id, appendA.lowWatermark = iso.newAppendID(10) + require.Equal(t, result{1, 1}, appendA) + require.Equal(t, uint64(1), iso.lowWatermark()) + + require.Equal(t, 0, countOpenReads(iso)) + require.Equal(t, int64(10), iso.lowestAppendTime()) + + // Now we start a read. + stateA := iso.State(10, 20) + require.Equal(t, 1, countOpenReads(iso)) + + // Second appender. + appendB.id, appendB.lowWatermark = iso.newAppendID(20) + require.Equal(t, result{2, 1}, appendB) + require.Equal(t, uint64(1), iso.lowWatermark()) + require.Equal(t, int64(10), iso.lowestAppendTime()) + + iso.closeAppend(appendA.id) + // Low watermark remains at 1 because stateA is still open + require.Equal(t, uint64(1), iso.lowWatermark()) + + require.Equal(t, 1, countOpenReads(iso)) + require.Equal(t, int64(20), iso.lowestAppendTime()) + + // Finish the read and low watermark should rise. + stateA.Close() + require.Equal(t, uint64(2), iso.lowWatermark()) + + require.Equal(t, 0, countOpenReads(iso)) + + iso.closeAppend(appendB.id) + require.Equal(t, uint64(2), iso.lowWatermark()) + require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime()) +} + +func countOpenReads(iso *isolation) int { + count := 0 + iso.TraverseOpenReads(func(s *isolationState) bool { + count++ + return true + }) + return count +} + func BenchmarkIsolation(b *testing.B) { for _, goroutines := range []int{10, 100, 1000, 10000} { b.Run(strconv.Itoa(goroutines), func(b *testing.B) { @@ -36,7 +95,7 @@ func BenchmarkIsolation(b *testing.B) { <-start for i := 0; i < b.N; i++ { - appendID, _ := iso.newAppendID() + appendID, _ := iso.newAppendID(0) iso.closeAppend(appendID) } @@ -66,7 +125,7 @@ func BenchmarkIsolationWithState(b *testing.B) { <-start for i := 0; i < b.N; i++ { - appendID, _ := iso.newAppendID() + appendID, _ := iso.newAppendID(0) iso.closeAppend(appendID) }