mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
tsdb: Block Head GC till pending readers are done reading (#9081)
* tsdb: Block Head GC till pending readers are done reading Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments 2 Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix the exclusiveness of maxt in WaitForPendingReadersInTimeRange Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
1bcd13d6b5
commit
59d02b5ef0
58
tsdb/db.go
58
tsdb/db.go
|
@ -1525,8 +1525,32 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err
|
|||
blocks = append(blocks, b)
|
||||
}
|
||||
}
|
||||
var headQuerier storage.Querier
|
||||
if maxt >= db.head.MinTime() {
|
||||
blocks = append(blocks, NewRangeHead(db.head, mint, maxt))
|
||||
rh := NewRangeHead(db.head, mint, maxt)
|
||||
var err error
|
||||
headQuerier, err = NewBlockQuerier(rh, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open querier for head %s", rh)
|
||||
}
|
||||
|
||||
// Getting the querier above registers itself in the queue that the truncation waits on.
|
||||
// So if the querier is currently not colliding with any truncation, we can continue to use it and still
|
||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||
if shouldClose {
|
||||
if err := headQuerier.Close(); err != nil {
|
||||
return nil, errors.Wrapf(err, "closing head querier %s", rh)
|
||||
}
|
||||
headQuerier = nil
|
||||
}
|
||||
if getNew {
|
||||
rh := NewRangeHead(db.head, newMint, maxt)
|
||||
headQuerier, err = NewBlockQuerier(rh, newMint, maxt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blockQueriers := make([]storage.Querier, 0, len(blocks))
|
||||
|
@ -1543,6 +1567,9 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err
|
|||
}
|
||||
return nil, errors.Wrapf(err, "open querier for block %s", b)
|
||||
}
|
||||
if headQuerier != nil {
|
||||
blockQueriers = append(blockQueriers, headQuerier)
|
||||
}
|
||||
return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil
|
||||
}
|
||||
|
||||
|
@ -1558,8 +1585,32 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu
|
|||
blocks = append(blocks, b)
|
||||
}
|
||||
}
|
||||
var headQuerier storage.ChunkQuerier
|
||||
if maxt >= db.head.MinTime() {
|
||||
blocks = append(blocks, NewRangeHead(db.head, mint, maxt))
|
||||
rh := NewRangeHead(db.head, mint, maxt)
|
||||
var err error
|
||||
headQuerier, err = NewBlockChunkQuerier(rh, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open querier for head %s", rh)
|
||||
}
|
||||
|
||||
// Getting the querier above registers itself in the queue that the truncation waits on.
|
||||
// So if the querier is currently not colliding with any truncation, we can continue to use it and still
|
||||
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
|
||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
|
||||
if shouldClose {
|
||||
if err := headQuerier.Close(); err != nil {
|
||||
return nil, errors.Wrapf(err, "closing head querier %s", rh)
|
||||
}
|
||||
headQuerier = nil
|
||||
}
|
||||
if getNew {
|
||||
rh := NewRangeHead(db.head, newMint, maxt)
|
||||
headQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks))
|
||||
|
@ -1576,6 +1627,9 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu
|
|||
}
|
||||
return nil, errors.Wrapf(err, "open querier for block %s", b)
|
||||
}
|
||||
if headQuerier != nil {
|
||||
blockQueriers = append(blockQueriers, headQuerier)
|
||||
}
|
||||
|
||||
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
|
||||
}
|
||||
|
|
|
@ -3444,3 +3444,18 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
|
|||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestDB(t *testing.T) *DB {
|
||||
dir, err := ioutil.TempDir("", "test")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, os.RemoveAll(dir))
|
||||
})
|
||||
|
||||
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
return db
|
||||
}
|
||||
|
|
102
tsdb/head.go
102
tsdb/head.go
|
@ -64,12 +64,13 @@ type ExemplarStorage interface {
|
|||
|
||||
// Head handles reads and writes of time series data within a time window.
|
||||
type Head struct {
|
||||
chunkRange atomic.Int64
|
||||
numSeries atomic.Uint64
|
||||
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head.
|
||||
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||||
lastWALTruncationTime atomic.Int64
|
||||
lastSeriesID atomic.Uint64
|
||||
chunkRange atomic.Int64
|
||||
numSeries atomic.Uint64
|
||||
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head.
|
||||
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
|
||||
lastWALTruncationTime atomic.Int64
|
||||
lastMemoryTruncationTime atomic.Int64
|
||||
lastSeriesID atomic.Uint64
|
||||
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
|
@ -110,6 +111,8 @@ type Head struct {
|
|||
|
||||
stats *HeadStats
|
||||
reg prometheus.Registerer
|
||||
|
||||
memTruncationInProcess atomic.Bool
|
||||
}
|
||||
|
||||
// HeadOptions are parameters for the Head block.
|
||||
|
@ -414,6 +417,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
|
|||
h.minTime.Store(math.MaxInt64)
|
||||
h.maxTime.Store(math.MinInt64)
|
||||
h.lastWALTruncationTime.Store(math.MinInt64)
|
||||
h.lastMemoryTruncationTime.Store(math.MinInt64)
|
||||
h.metrics = newHeadMetrics(h, r)
|
||||
|
||||
if opts.ChunkPool == nil {
|
||||
|
@ -974,11 +978,24 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||
h.metrics.headTruncateFail.Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
initialize := h.MinTime() == math.MaxInt64
|
||||
|
||||
if h.MinTime() >= mint && !initialize {
|
||||
return nil
|
||||
}
|
||||
|
||||
// The order of these two Store() should not be changed,
|
||||
// i.e. truncation time is set before in-process boolean.
|
||||
h.lastMemoryTruncationTime.Store(mint)
|
||||
h.memTruncationInProcess.Store(true)
|
||||
defer h.memTruncationInProcess.Store(false)
|
||||
|
||||
// We wait for pending queries to end that overlap with this truncation.
|
||||
if !initialize {
|
||||
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
|
||||
}
|
||||
|
||||
h.minTime.Store(mint)
|
||||
h.minValidTime.Store(mint)
|
||||
|
||||
|
@ -1020,6 +1037,75 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||
// The query timeout limits the max wait time of this function implicitly.
|
||||
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
||||
func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
|
||||
maxt-- // Making it inclusive before checking overlaps.
|
||||
overlaps := func() bool {
|
||||
o := false
|
||||
h.iso.TraverseOpenReads(func(s *isolationState) bool {
|
||||
if s.mint <= maxt && mint <= s.maxt {
|
||||
// Overlaps with the truncation range.
|
||||
o = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return o
|
||||
}
|
||||
for overlaps() {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier
|
||||
// has to be created. In the latter case, the method also returns the new mint to be used for creating the
|
||||
// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
|
||||
//
|
||||
// NOTE: The querier should already be taken before calling this.
|
||||
func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose bool, getNew bool, newMint int64) {
|
||||
if !h.memTruncationInProcess.Load() {
|
||||
return false, false, 0
|
||||
}
|
||||
// Head truncation is in process. It also means that the block that was
|
||||
// created for this truncation range is also available.
|
||||
// Check if we took a querier that overlaps with this truncation.
|
||||
memTruncTime := h.lastMemoryTruncationTime.Load()
|
||||
if querierMaxt < memTruncTime {
|
||||
// Head compaction has happened and this time range is being truncated.
|
||||
// This query doesn't overlap with the Head any longer.
|
||||
// We should close this querier to avoid races and the data would be
|
||||
// available with the blocks below.
|
||||
// Cases:
|
||||
// 1. |------truncation------|
|
||||
// |---query---|
|
||||
// 2. |------truncation------|
|
||||
// |---query---|
|
||||
return true, false, 0
|
||||
}
|
||||
if querierMint < memTruncTime {
|
||||
// The truncation time is not same as head mint that we saw above but the
|
||||
// query still overlaps with the Head.
|
||||
// The truncation started after we got the querier. So it is not safe
|
||||
// to use this querier and/or might block truncation. We should get
|
||||
// a new querier for the new Head range while remaining will be available
|
||||
// in the blocks below.
|
||||
// Case:
|
||||
// |------truncation------|
|
||||
// |----query----|
|
||||
// Turns into
|
||||
// |------truncation------|
|
||||
// |---qu---|
|
||||
return true, true, memTruncTime
|
||||
}
|
||||
|
||||
// Other case is this, which is a no-op
|
||||
// |------truncation------|
|
||||
// |---query---|
|
||||
return false, false, 0
|
||||
}
|
||||
|
||||
// truncateWAL removes old data before mint from the WAL.
|
||||
func (h *Head) truncateWAL(mint int64) error {
|
||||
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
|
||||
|
@ -1147,7 +1233,7 @@ func (h *RangeHead) Index() (IndexReader, error) {
|
|||
}
|
||||
|
||||
func (h *RangeHead) Chunks() (ChunkReader, error) {
|
||||
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State())
|
||||
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt))
|
||||
}
|
||||
|
||||
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
|
||||
|
@ -1721,7 +1807,7 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
|||
|
||||
// Chunks returns a ChunkReader against the block.
|
||||
func (h *Head) Chunks() (ChunkReader, error) {
|
||||
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State())
|
||||
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64))
|
||||
}
|
||||
|
||||
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
|
||||
|
|
|
@ -16,6 +16,7 @@ package tsdb
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
|
@ -25,10 +26,12 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
|
@ -1230,11 +1233,11 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
|||
|
||||
q, err := NewBlockQuerier(h, 1500, 2500)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
require.Equal(t, false, ss.Next())
|
||||
require.Equal(t, 0, len(ss.Warnings()))
|
||||
require.NoError(t, q.Close())
|
||||
|
||||
// Truncate again, this time the series should be deleted
|
||||
require.NoError(t, h.Truncate(2050))
|
||||
|
@ -1490,7 +1493,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
|
||||
require.NoError(t, err)
|
||||
|
||||
iso := h.iso.State()
|
||||
iso := h.iso.State(math.MinInt64, math.MaxInt64)
|
||||
iso.maxAppendID = maxAppendID
|
||||
|
||||
chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso)
|
||||
|
@ -1705,7 +1708,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
|||
require.NoError(t, app2.Commit())
|
||||
require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.")
|
||||
|
||||
is := hb.iso.State()
|
||||
is := hb.iso.State(math.MinInt64, math.MaxInt64)
|
||||
require.Equal(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.")
|
||||
|
||||
require.NoError(t, app1.Commit())
|
||||
|
@ -2179,3 +2182,218 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
|
|||
ok = it.Seek(7)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Tests https://github.com/prometheus/prometheus/issues/8221.
|
||||
func TestChunkNotFoundHeadGCRace(t *testing.T) {
|
||||
db := newTestDB(t)
|
||||
db.DisableCompactions()
|
||||
|
||||
var (
|
||||
app = db.Appender(context.Background())
|
||||
ref = uint64(0)
|
||||
mint, maxt = int64(0), int64(0)
|
||||
err error
|
||||
)
|
||||
|
||||
// Appends samples to span over 1.5 block ranges.
|
||||
// 7 chunks with 15s scrape interval.
|
||||
for i := int64(0); i <= 120*7; i++ {
|
||||
ts := i * DefaultBlockDuration / (4 * 120)
|
||||
ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i))
|
||||
require.NoError(t, err)
|
||||
maxt = ts
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Get a querier before compaction (or when compaction is about to begin).
|
||||
q, err := db.Querier(context.Background(), mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Query the compacted range and get the first series before compaction.
|
||||
ss := q.Select(true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
require.True(t, ss.Next())
|
||||
s := ss.At()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Compacting head while the querier spans the compaction time.
|
||||
require.NoError(t, db.Compact())
|
||||
require.Greater(t, len(db.Blocks()), 0)
|
||||
}()
|
||||
|
||||
// Give enough time for compaction to finish.
|
||||
// We expect it to be blocked until querier is closed.
|
||||
<-time.After(3 * time.Second)
|
||||
|
||||
// Now consume after compaction when it's gone.
|
||||
it := s.Iterator()
|
||||
for it.Next() {
|
||||
_, _ = it.At()
|
||||
}
|
||||
// It should error here without any fix for the mentioned issue.
|
||||
require.NoError(t, it.Err())
|
||||
for ss.Next() {
|
||||
s = ss.At()
|
||||
it := s.Iterator()
|
||||
for it.Next() {
|
||||
_, _ = it.At()
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
require.NoError(t, ss.Err())
|
||||
|
||||
require.NoError(t, q.Close())
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Tests https://github.com/prometheus/prometheus/issues/9079.
|
||||
func TestDataMissingOnQueryDuringCompaction(t *testing.T) {
|
||||
db := newTestDB(t)
|
||||
db.DisableCompactions()
|
||||
|
||||
var (
|
||||
app = db.Appender(context.Background())
|
||||
ref = uint64(0)
|
||||
mint, maxt = int64(0), int64(0)
|
||||
err error
|
||||
)
|
||||
|
||||
// Appends samples to span over 1.5 block ranges.
|
||||
expSamples := make([]tsdbutil.Sample, 0)
|
||||
// 7 chunks with 15s scrape interval.
|
||||
for i := int64(0); i <= 120*7; i++ {
|
||||
ts := i * DefaultBlockDuration / (4 * 120)
|
||||
ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i))
|
||||
require.NoError(t, err)
|
||||
maxt = ts
|
||||
expSamples = append(expSamples, sample{ts, float64(i)})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Get a querier before compaction (or when compaction is about to begin).
|
||||
q, err := db.Querier(context.Background(), mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Compacting head while the querier spans the compaction time.
|
||||
require.NoError(t, db.Compact())
|
||||
require.Greater(t, len(db.Blocks()), 0)
|
||||
}()
|
||||
|
||||
// Give enough time for compaction to finish.
|
||||
// We expect it to be blocked until querier is closed.
|
||||
<-time.After(3 * time.Second)
|
||||
|
||||
// Querying the querier that was got before compaction.
|
||||
series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
require.Equal(t, map[string][]tsdbutil.Sample{`{a="b"}`: expSamples}, series)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestIsQuerierCollidingWithTruncation(t *testing.T) {
|
||||
db := newTestDB(t)
|
||||
db.DisableCompactions()
|
||||
|
||||
var (
|
||||
app = db.Appender(context.Background())
|
||||
ref = uint64(0)
|
||||
err error
|
||||
)
|
||||
|
||||
for i := int64(0); i <= 3000; i++ {
|
||||
ref, err = app.Append(ref, labels.FromStrings("a", "b"), i, float64(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// This mocks truncation.
|
||||
db.head.memTruncationInProcess.Store(true)
|
||||
db.head.lastMemoryTruncationTime.Store(2000)
|
||||
|
||||
// Test that IsQuerierValid suggests correct querier ranges.
|
||||
cases := []struct {
|
||||
mint, maxt int64 // For the querier.
|
||||
expShouldClose, expGetNew bool
|
||||
expNewMint int64
|
||||
}{
|
||||
{-200, -100, true, false, 0},
|
||||
{-200, 300, true, false, 0},
|
||||
{100, 1900, true, false, 0},
|
||||
{1900, 2200, true, true, 2000},
|
||||
{2000, 2500, false, false, 0},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(fmt.Sprintf("mint=%d,maxt=%d", c.mint, c.maxt), func(t *testing.T) {
|
||||
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(c.mint, c.maxt)
|
||||
require.Equal(t, c.expShouldClose, shouldClose)
|
||||
require.Equal(t, c.expGetNew, getNew)
|
||||
if getNew {
|
||||
require.Equal(t, c.expNewMint, newMint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForPendingReadersInTimeRange(t *testing.T) {
|
||||
db := newTestDB(t)
|
||||
db.DisableCompactions()
|
||||
|
||||
sampleTs := func(i int64) int64 { return i * DefaultBlockDuration / (4 * 120) }
|
||||
|
||||
var (
|
||||
app = db.Appender(context.Background())
|
||||
ref = uint64(0)
|
||||
err error
|
||||
)
|
||||
|
||||
for i := int64(0); i <= 3000; i++ {
|
||||
ts := sampleTs(i)
|
||||
ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
truncMint, truncMaxt := int64(1000), int64(2000)
|
||||
cases := []struct {
|
||||
mint, maxt int64
|
||||
shouldWait bool
|
||||
}{
|
||||
{0, 500, false}, // Before truncation range.
|
||||
{500, 1500, true}, // Overlaps with truncation at the start.
|
||||
{1200, 1700, true}, // Within truncation range.
|
||||
{1800, 2500, true}, // Overlaps with truncation at the end.
|
||||
{2000, 2500, false}, // After truncation range.
|
||||
{2100, 2500, false}, // After truncation range.
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(fmt.Sprintf("mint=%d,maxt=%d,shouldWait=%t", c.mint, c.maxt, c.shouldWait), func(t *testing.T) {
|
||||
checkWaiting := func(cl io.Closer) {
|
||||
var waitOver atomic.Bool
|
||||
go func() {
|
||||
db.head.WaitForPendingReadersInTimeRange(truncMint, truncMaxt)
|
||||
waitOver.Store(true)
|
||||
}()
|
||||
<-time.After(550 * time.Millisecond)
|
||||
require.Equal(t, !c.shouldWait, waitOver.Load())
|
||||
require.NoError(t, cl.Close())
|
||||
<-time.After(550 * time.Millisecond)
|
||||
require.True(t, waitOver.Load())
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), c.mint, c.maxt)
|
||||
require.NoError(t, err)
|
||||
checkWaiting(q)
|
||||
|
||||
cq, err := db.ChunkQuerier(context.Background(), c.mint, c.maxt)
|
||||
require.NoError(t, err)
|
||||
checkWaiting(cq)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ type isolationState struct {
|
|||
incompleteAppends map[uint64]struct{}
|
||||
lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID.
|
||||
isolation *isolation
|
||||
mint, maxt int64 // Time ranges of the read.
|
||||
|
||||
// Doubly linked list of active reads.
|
||||
next *isolationState
|
||||
|
@ -102,7 +103,7 @@ func (i *isolation) lowWatermarkLocked() uint64 {
|
|||
|
||||
// State returns an object used to control isolation
|
||||
// between a query and appends. Must be closed when complete.
|
||||
func (i *isolation) State() *isolationState {
|
||||
func (i *isolation) State(mint, maxt int64) *isolationState {
|
||||
i.appendMtx.RLock() // Take append mutex before read mutex.
|
||||
defer i.appendMtx.RUnlock()
|
||||
isoState := &isolationState{
|
||||
|
@ -110,6 +111,8 @@ func (i *isolation) State() *isolationState {
|
|||
lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
|
||||
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
|
||||
isolation: i,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}
|
||||
for k := range i.appendsOpen {
|
||||
isoState.incompleteAppends[k] = struct{}{}
|
||||
|
@ -124,6 +127,21 @@ func (i *isolation) State() *isolationState {
|
|||
return isoState
|
||||
}
|
||||
|
||||
// TraverseOpenReads iterates through the open reads and runs the given
|
||||
// function on those states. The given function MUST NOT mutate the isolationState.
|
||||
// The iteration is stopped when the function returns false or once all reads have been iterated.
|
||||
func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
|
||||
i.readMtx.RLock()
|
||||
defer i.readMtx.RUnlock()
|
||||
s := i.readsOpen.next
|
||||
for s != i.readsOpen {
|
||||
if !f(s) {
|
||||
return
|
||||
}
|
||||
s = s.next
|
||||
}
|
||||
}
|
||||
|
||||
// newAppendID increments the transaction counter and returns a new transaction
|
||||
// ID. The first ID returned is 1.
|
||||
// Also returns the low watermark, to keep lock/unlock operations down
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -85,7 +86,7 @@ func BenchmarkIsolationWithState(b *testing.B) {
|
|||
<-start
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
s := iso.State()
|
||||
s := iso.State(math.MinInt64, math.MaxInt64)
|
||||
s.Close()
|
||||
}
|
||||
}()
|
||||
|
|
Loading…
Reference in a new issue