Implement isolation

This has been ported from https://github.com/prometheus/tsdb/pull/306.

Original implementation by @brian-brazil, explained in detail in the
2nd half of this talk:
https://promcon.io/2017-munich/talks/staleness-in-prometheus-2-0/

The implementation was then processed by @gouthamve into the PR linked
above. Relevant slide deck:
https://docs.google.com/presentation/d/1-ICg7PEmDHYcITykD2SR2xwg56Tzf4gr8zfz1OerY5Y/edit?usp=drivesdk

Signed-off-by: beorn7 <beorn@grafana.com>
Co-authored-by: Brian Brazil <brian.brazil@robustperception.io>
Co-authored-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
beorn7 2020-02-12 20:22:27 +01:00
parent 6b8181370f
commit 7f30b0984d
4 changed files with 740 additions and 226 deletions

View file

@ -87,6 +87,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
}
testutil.Ok(t, it.Err())
if len(samples) == 0 {
continue
}
name := series.Labels().String()
result[name] = samples
}
@ -1276,20 +1280,29 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
lres, err := expandSeriesSet(ss)
lres, _, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, c.series, lres)
}
}
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, error) {
result := []labels.Labels{}
// expandSeriesSet returns the raw labels in the order they are retrieved from
// the series set and the samples keyed by Labels().String().
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, error) {
resultLabels := []labels.Labels{}
resultSamples := map[string][]sample{}
for ss.Next() {
result = append(result, ss.At().Labels())
series := ss.At()
samples := []sample{}
it := series.Iterator()
for it.Next() {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
}
return result, ss.Err()
resultLabels = append(resultLabels, series.Labels())
resultSamples[series.Labels().String()] = samples
}
return resultLabels, resultSamples, ss.Err()
}
func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
@ -2477,6 +2490,136 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Equals(t, 1000.0, sum)
}
func TestDBCannotSeePartialCommits(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
stop := make(chan struct{})
firstInsert := make(chan struct{})
// Insert data in batches.
go func() {
iter := 0
for {
app := db.Appender()
for j := 0; j < 100; j++ {
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
testutil.Ok(t, err)
}
err = app.Commit()
testutil.Ok(t, err)
if iter == 0 {
close(firstInsert)
}
iter++
select {
case <-stop:
return
default:
}
}
}()
<-firstInsert
// This is a race condition, so do a few tests to tickle it.
// Usually most will fail.
inconsistencies := 0
for i := 0; i < 10; i++ {
func() {
querier, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
values := map[float64]struct{}{}
for _, series := range seriesSet {
values[series[len(series)-1].v] = struct{}{}
}
if len(values) != 1 {
inconsistencies++
}
}()
}
stop <- struct{}{}
testutil.Equals(t, 0, inconsistencies, "Some queries saw inconsistent results.")
}
func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
querierBeforeAdd, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierBeforeAdd.Close()
app := db.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierAfterAddButBeforeCommit.Close()
// None of the queriers should return anything after the Add but before the commit.
ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
// This commit is after the queriers are created, so should not be returned.
err = app.Commit()
testutil.Ok(t, err)
// Nothing returned for querier created before the Add.
ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
// Series exists but has no samples for querier created after Add.
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{}}, seriesSet)
querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierAfterCommit.Close()
// Samples are returned for querier created after Commit.
ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}, seriesSet)
}
// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data.
func TestChunkWriter_ReadAfterWrite(t *testing.T) {

View file

@ -90,6 +90,8 @@ type Head struct {
tombstones *tombstones.MemTombstones
iso *isolation
cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
@ -105,8 +107,6 @@ type headMetrics struct {
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
@ -119,109 +119,93 @@ type headMetrics struct {
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
m := &headMetrics{
activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
}),
series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
})
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.",
})
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
}),
chunks: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{
}),
chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{
}),
chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
}),
gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
Objectives: map[float64]float64{},
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MaxTime())
})
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
}),
walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
Objectives: map[float64]float64{},
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
}),
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
})
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
}),
}
if r != nil {
r.MustRegister(
m.activeAppenders,
m.series,
m.chunks,
m.chunksCreated,
m.chunksRemoved,
m.series,
m.seriesCreated,
m.seriesRemoved,
m.seriesNotFound,
m.minTime,
m.maxTime,
m.gcDuration,
m.walTruncateDuration,
m.walCorruptionsTotal,
@ -232,6 +216,34 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
// Metrics bound to functions and not needed in tests
// can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MaxTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MinTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_low_watermark",
Help: "The lowest TSDB append ID that is still referenced.",
}, func() float64 {
return float64(h.iso.lowWatermark())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_high_watermark",
Help: "The highest TSDB append ID that has been given out.",
}, func() float64 {
h.iso.appendMtx.Lock()
defer h.iso.appendMtx.Unlock()
return float64(h.iso.lastAppendID)
}),
)
}
return m
@ -279,6 +291,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
iso: newIsolation(),
deleted: map[uint64]int{},
}
h.metrics = newHeadMetrics(h, r)
@ -314,8 +327,7 @@ func (h *Head) processWALSamples(
}
refSeries[s.Ref] = ms
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
@ -564,7 +576,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
}
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime = minValidTime
@ -775,7 +787,7 @@ func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) {
}
func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil
}
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
@ -810,6 +822,8 @@ func (h *RangeHead) Meta() BlockMeta {
type initAppender struct {
app storage.Appender
head *Head
appendID, cleanupAppendIDsBelow uint64
}
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -817,7 +831,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
return a.app.Add(lset, t, v)
}
a.head.initTime(t)
a.app = a.head.appender()
a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow)
return a.app.Add(lset, t, v)
}
@ -847,15 +861,22 @@ func (a *initAppender) Rollback() error {
func (h *Head) Appender() storage.Appender {
h.metrics.activeAppenders.Inc()
appendID := h.iso.newAppendID()
cleanupAppendIDsBelow := h.iso.lowWatermark()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h}
return &initAppender{
head: h,
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
return h.appender()
}
return h.appender(appendID, cleanupAppendIDsBelow)
}
func (h *Head) appender() *headAppender {
func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender {
return &headAppender{
head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaction window.
@ -865,6 +886,8 @@ func (h *Head) appender() *headAppender {
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
}
@ -922,6 +945,8 @@ type headAppender struct {
series []record.RefSeries
samples []record.RefSample
sampleSeries []*memSeries
appendID, cleanupAppendIDsBelow uint64
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -1023,7 +1048,8 @@ func (a *headAppender) Commit() error {
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
ok, chunkCreated := series.append(s.T, s.V)
ok, chunkCreated := series.append(s.T, s.V, a.appendID)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1038,6 +1064,7 @@ func (a *headAppender) Commit() error {
a.head.metrics.samplesAppended.Add(float64(total))
a.head.updateMinMaxTime(a.mint, a.maxt)
a.head.iso.closeAppend(a.appendID)
return nil
}
@ -1048,14 +1075,16 @@ func (a *headAppender) Rollback() error {
for i := range a.samples {
series = a.sampleSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
a.head.putAppendBuffer(a.samples)
a.samples = nil
a.head.iso.closeAppend(a.appendID)
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
a.samples = nil
return a.log()
}
@ -1182,14 +1211,19 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64), nil
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil
}
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{head: h, mint: mint, maxt: maxt}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}
}
// NumSeries returns the number of active series in the head.
@ -1240,9 +1274,11 @@ func (h *Head) Close() error {
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
}
func (h *headChunkReader) Close() error {
h.isoState.Close()
return nil
}
@ -1287,6 +1323,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
Chunk: c.chunk,
s: s,
cid: int(cid),
isoState: h.isoState,
}, nil
}
@ -1294,11 +1331,12 @@ type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid int
isoState *isolationState
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, reuseIter)
it := c.s.iterator(c.cid, c.isoState, reuseIter)
c.s.Unlock()
return it
}
@ -1698,6 +1736,8 @@ type memSeries struct {
pendingCommit bool // Whether there are samples waiting to be committed to this series.
app chunkenc.Appender // Current appender for the chunk.
txs *txRing
}
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
@ -1706,6 +1746,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
txs: newTxRing(4),
}
return s
}
@ -1805,8 +1846,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
return k
}
// append adds the sample (t, v) to the series.
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation.
func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -1843,11 +1885,19 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
s.txs.add(appendID)
return true, chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
// its current timestamp and the upper bound up to which we insert data.
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
s.txs.cleanupAppendIDsBelow(bound)
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
// chunk, its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/4 full.
func computeChunkEndTime(start, cur, max int64) int64 {
a := (max - start) / ((cur - start + 1) * 4)
@ -1857,31 +1907,92 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
// We must ensure to not garbage collect as long as any readers still hold a reference.
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if c == nil {
return chunkenc.NewNopIterator()
}
ix := id - s.firstChunkID
numSamples := c.chunk.NumSamples()
stopAfter := numSamples
if isoState != nil {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.chunks {
totalSamples += d.chunk.NumSamples()
if j < ix {
previousSamples += d.chunk.NumSamples()
}
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := 0; index < appendIDsToConsider; index++ {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
stopAfter = numSamples - (appendIDsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if id-s.firstChunkID < len(s.chunks)-1 {
if stopAfter == numSamples {
return c.chunk.Iterator(it)
}
if msIter, ok := it.(*stopIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.stopAfter = stopAfter
return msIter
}
return &stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = c.chunk.NumSamples()
msIter.total = numSamples
msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
stopIterator: stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
total: c.chunk.NumSamples(),
stopAfter: stopAfter,
},
total: numSamples,
buf: s.sampleBuf,
}
}
@ -1900,16 +2011,29 @@ func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type memSafeIterator struct {
type stopIterator struct {
chunkenc.Iterator
i int
i, stopAfter int
}
func (it *stopIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
return it.Iterator.Next()
}
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
}
func (it *memSafeIterator) Next() bool {
if it.i+1 >= it.total {
if it.i+1 >= it.stopAfter {
return false
}
it.i++

View file

@ -243,9 +243,9 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, c.Err())
return x
}
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil)))
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil)))
})
}
}
@ -296,7 +296,16 @@ func TestHead_WALMultiRef(t *testing.T) {
}
func TestHead_Truncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize)
dir, err := ioutil.TempDir("", "test_truncate")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -308,18 +317,18 @@ func TestHead_Truncate(t *testing.T) {
s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
s1.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
{minTime: 2000, maxTime: 2999},
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
}
s2.chunks = []*memChunk{
{minTime: 1000, maxTime: 1999},
{minTime: 2000, maxTime: 2999},
{minTime: 3000, maxTime: 3999},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
}
s3.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
}
s4.chunks = []*memChunk{}
@ -329,12 +338,12 @@ func TestHead_Truncate(t *testing.T) {
testutil.Ok(t, h.Truncate(2000))
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s1.ref).chunks)
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999},
{minTime: 3000, maxTime: 3999},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s2.ref).chunks)
testutil.Assert(t, h.series.getByID(s3.ref) == nil, "")
@ -375,7 +384,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i))
ok, _ := s.append(int64(i), float64(i), 0)
testutil.Assert(t, ok == true, "sample append failed")
}
@ -397,11 +406,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil)
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil)
_, ok := it1.(*memSafeIterator)
testutil.Assert(t, ok == true, "")
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil)
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil)
_, ok = it2.(*memSafeIterator)
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
}
@ -921,19 +930,19 @@ func TestMemSeries_append(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1)
ok, chunkCreated := s.append(998, 1, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2)
ok, chunkCreated = s.append(999, 2, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3)
ok, chunkCreated = s.append(1000, 3, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4)
ok, chunkCreated = s.append(1001, 4, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
@ -943,7 +952,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i))
ok, _ := s.append(1001+int64(i), float64(i), 0)
testutil.Assert(t, ok, "append failed")
}
@ -966,18 +975,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0)
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999)
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000)
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999)
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
@ -993,7 +1002,7 @@ func TestGCChunkAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 1500)
cr := h.chunksRange(0, 1500, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
@ -1018,18 +1027,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0)
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999)
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000)
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999)
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
@ -1045,7 +1054,7 @@ func TestGCSeriesAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 2000)
cr := h.chunksRange(0, 2000, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
@ -1068,7 +1077,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h.initTime(0)
app := h.appender()
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
@ -1096,7 +1105,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h.initTime(0)
app := h.appender()
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
@ -1416,28 +1425,29 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Ok(t, err)
defer hb.Close()
lastValue := func(maxWriteId uint64) int {
idx, err := hb.Index()
lastValue := func(maxAppendID uint64) int {
idx, err := hb.Index(hb.MinTime(), hb.MaxTime())
testutil.Ok(t, err)
iso := hb.iso.State()
iso.maxWriteID = maxWriteId
iso.maxAppendID = maxAppendID
querier := &blockQuerier{
mint: 0,
maxt: 10000,
index: idx,
chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso),
tombstones: emptyTombstoneReader,
tombstones: tombstones.NewMemTombstones(),
}
testutil.Ok(t, err)
defer querier.Close()
ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
seriesSet := readSeriesSet(t, ss)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
for _, series := range seriesSet {
return int(series[len(series)-1].v)
}
@ -1446,20 +1456,20 @@ func TestMemSeriesIsolation(t *testing.T) {
i := 0
for ; i <= 1000; i++ {
var app Appender
// To initialise bounds.
if hb.MinTime() == math.MinInt64 {
app = &initAppender{head: hb, writeID: uint64(i), cleanupWriteIDsBelow: 0}
var app storage.Appender
// To initialize bounds.
if hb.MinTime() == math.MaxInt64 {
app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0}
} else {
app = hb.appender(uint64(i), 0)
}
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err, "Failed to add sample")
testutil.Ok(t, app.Commit(), "Unexpected error committing appender")
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
// Test simple cases in different chunks when no writeId cleanup has been performed.
// Test simple cases in different chunks when no appendID cleanup has been performed.
testutil.Equals(t, 10, lastValue(10))
testutil.Equals(t, 130, lastValue(130))
testutil.Equals(t, 160, lastValue(160))
@ -1469,15 +1479,15 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup writeIds below 500.
// Cleanup appendIDs below 500.
app := hb.appender(uint64(i), 500)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err, "Failed to add sample")
testutil.Ok(t, app.Commit(), "Unexpected error committing appender")
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
i++
// We should not get queries with a maxWriteId below 500 after the cleanup,
// but they only take the remaining writeIds into account.
// We should not get queries with a maxAppendID below 500 after the cleanup,
// but they only take the remaining appendIDs into account.
testutil.Equals(t, 499, lastValue(10))
testutil.Equals(t, 499, lastValue(130))
testutil.Equals(t, 499, lastValue(160))
@ -1486,49 +1496,87 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup writeIds below 1000, which means the sample buffer is
// the only thing with writeIds.
// Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs.
app = hb.appender(uint64(i), 1000)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err, "Failed to add sample")
testutil.Ok(t, app.Commit(), "Unexpected error committing appender")
i++
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, 999, lastValue(998))
testutil.Equals(t, 999, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
}
func TestHead_Truncate_WriteIDs(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000)
i++
// Cleanup appendIDs below 1001, but with a rollback.
app = hb.appender(uint64(i), 1001)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
defer h.Close()
testutil.Ok(t, app.Rollback())
testutil.Equals(t, 1000, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
}
h.initTime(0)
s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
chk := chunkenc.NewXORChunk()
app, err := chk.Appender()
func TestIsolationRollback(t *testing.T) {
// Rollback after a failed append and test if the low watermark has progressed anyway.
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app.Append(1, 0)
app.Append(2, 0)
app.Append(3, 0)
app := hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
s1.chunks = []*memChunk{
{minTime: 0, maxTime: 999, chunk: chk},
{minTime: 1000, maxTime: 1999, chunk: chk},
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
testutil.NotOk(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.")
}
s1.txs.txIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1}
s1.txs.txIDFirst = 7
s1.txs.txIDCount = 5
func TestIsolationLowWatermarkMonotonous(t *testing.T) {
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
testutil.Ok(t, h.Truncate(1000))
testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.txs.txIDs)
testutil.Equals(t, 0, s1.txs.txIDFirst)
testutil.Equals(t, 3, s1.txs.txIDCount)
app1 := hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app1 = hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not commited yet.")
app2 := hb.Appender()
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app2.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not commited yet.")
is := hb.iso.State()
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.")
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is commited, low watermark should stay at 2 because read is still ongoing.")
is.Close()
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.")
}

199
tsdb/isolation.go Normal file
View file

@ -0,0 +1,199 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"sync"
)
// isolationState holds the isolation information.
type isolationState struct {
// We will ignore all appends above the max, or that are incomplete.
maxAppendID uint64
incompleteAppends map[uint64]struct{}
lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID.
isolation *isolation
// Doubly linked list of active reads.
next *isolationState
prev *isolationState
}
// Close closes the state.
func (i *isolationState) Close() {
i.isolation.readMtx.Lock()
defer i.isolation.readMtx.Unlock()
i.next.prev = i.prev
i.prev.next = i.next
}
// isolation is the global isolation state.
type isolation struct {
// Mutex for accessing lastAppendID and appendsOpen.
appendMtx sync.Mutex
// Each append is given an internal id.
lastAppendID uint64
// Which appends are currently in progress.
appendsOpen map[uint64]struct{}
// Mutex for accessing readsOpen.
// If taking both appendMtx and readMtx, take appendMtx first.
readMtx sync.Mutex
// All current in use isolationStates. This is a doubly-linked list.
readsOpen *isolationState
}
func newIsolation() *isolation {
isoState := &isolationState{}
isoState.next = isoState
isoState.prev = isoState
return &isolation{
appendsOpen: map[uint64]struct{}{},
readsOpen: isoState,
}
}
// lowWatermark returns the appendID below which we no longer need to track
// which appends were from which appendID.
func (i *isolation) lowWatermark() uint64 {
i.appendMtx.Lock() // Take appendMtx first.
defer i.appendMtx.Unlock()
i.readMtx.Lock()
defer i.readMtx.Unlock()
if i.readsOpen.prev != i.readsOpen {
return i.readsOpen.prev.lowWatermark
}
lw := i.lastAppendID
for k := range i.appendsOpen {
if k < lw {
lw = k
}
}
return lw
}
// State returns an object used to control isolation
// between a query and appends. Must be closed when complete.
func (i *isolation) State() *isolationState {
i.appendMtx.Lock() // Take append mutex before read mutex.
defer i.appendMtx.Unlock()
isoState := &isolationState{
maxAppendID: i.lastAppendID,
lowWatermark: i.lastAppendID,
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
isolation: i,
}
for k := range i.appendsOpen {
isoState.incompleteAppends[k] = struct{}{}
if k < isoState.lowWatermark {
isoState.lowWatermark = k
}
}
i.readMtx.Lock()
defer i.readMtx.Unlock()
isoState.prev = i.readsOpen
isoState.next = i.readsOpen.next
i.readsOpen.next.prev = isoState
i.readsOpen.next = isoState
return isoState
}
// newAppendID increments the transaction counter and returns a new transaction ID.
func (i *isolation) newAppendID() uint64 {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
i.lastAppendID++
i.appendsOpen[i.lastAppendID] = struct{}{}
return i.lastAppendID
}
func (i *isolation) closeAppend(appendID uint64) {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
delete(i.appendsOpen, appendID)
}
// The transactionID ring buffer.
type txRing struct {
txIDs []uint64
txIDFirst int // Position of the first id in the ring.
txIDCount int // How many ids in the ring.
}
func newTxRing(cap int) *txRing {
return &txRing{
txIDs: make([]uint64, cap),
}
}
func (txr *txRing) add(appendID uint64) {
if txr.txIDCount == len(txr.txIDs) {
// Ring buffer is full, expand by doubling.
newRing := make([]uint64, txr.txIDCount*2)
idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:])
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
txr.txIDs = newRing
txr.txIDFirst = 0
}
txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
txr.txIDCount++
}
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
pos := txr.txIDFirst
for txr.txIDCount > 0 {
if txr.txIDs[pos] < bound {
txr.txIDFirst++
txr.txIDCount--
} else {
break
}
pos++
if pos == len(txr.txIDs) {
pos = 0
}
}
txr.txIDFirst %= len(txr.txIDs)
}
func (txr *txRing) iterator() *txRingIterator {
return &txRingIterator{
pos: txr.txIDFirst,
ids: txr.txIDs,
}
}
// txRingIterator lets you iterate over the ring. It doesn't terminate,
// it DOESN'T terminate.
type txRingIterator struct {
ids []uint64
pos int
}
func (it *txRingIterator) At() uint64 {
return it.ids[it.pos]
}
func (it *txRingIterator) Next() {
it.pos++
if it.pos == len(it.ids) {
it.pos = 0
}
}