prometheus/tsdb/isolation.go

200 lines
4.8 KiB
Go
Raw Normal View History

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"sync"
)
// isolationState holds the isolation information.
type isolationState struct {
// We will ignore all appends above the max, or that are incomplete.
maxAppendID uint64
incompleteAppends map[uint64]struct{}
lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID.
isolation *isolation
// Doubly linked list of active reads.
next *isolationState
prev *isolationState
}
// Close closes the state.
func (i *isolationState) Close() {
i.isolation.readMtx.Lock()
defer i.isolation.readMtx.Unlock()
i.next.prev = i.prev
i.prev.next = i.next
}
// isolation is the global isolation state.
type isolation struct {
// Mutex for accessing lastAppendID and appendsOpen.
appendMtx sync.Mutex
// Each append is given an internal id.
lastAppendID uint64
// Which appends are currently in progress.
appendsOpen map[uint64]struct{}
// Mutex for accessing readsOpen.
// If taking both appendMtx and readMtx, take appendMtx first.
readMtx sync.Mutex
// All current in use isolationStates. This is a doubly-linked list.
readsOpen *isolationState
}
func newIsolation() *isolation {
isoState := &isolationState{}
isoState.next = isoState
isoState.prev = isoState
return &isolation{
appendsOpen: map[uint64]struct{}{},
readsOpen: isoState,
}
}
// lowWatermark returns the appendID below which we no longer need to track
// which appends were from which appendID.
func (i *isolation) lowWatermark() uint64 {
i.appendMtx.Lock() // Take appendMtx first.
defer i.appendMtx.Unlock()
i.readMtx.Lock()
defer i.readMtx.Unlock()
if i.readsOpen.prev != i.readsOpen {
return i.readsOpen.prev.lowWatermark
}
lw := i.lastAppendID
for k := range i.appendsOpen {
if k < lw {
lw = k
}
}
return lw
}
// State returns an object used to control isolation
// between a query and appends. Must be closed when complete.
func (i *isolation) State() *isolationState {
i.appendMtx.Lock() // Take append mutex before read mutex.
defer i.appendMtx.Unlock()
isoState := &isolationState{
maxAppendID: i.lastAppendID,
lowWatermark: i.lastAppendID,
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
isolation: i,
}
for k := range i.appendsOpen {
isoState.incompleteAppends[k] = struct{}{}
if k < isoState.lowWatermark {
isoState.lowWatermark = k
}
}
i.readMtx.Lock()
defer i.readMtx.Unlock()
isoState.prev = i.readsOpen
isoState.next = i.readsOpen.next
i.readsOpen.next.prev = isoState
i.readsOpen.next = isoState
return isoState
}
// newAppendID increments the transaction counter and returns a new transaction ID.
func (i *isolation) newAppendID() uint64 {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
i.lastAppendID++
i.appendsOpen[i.lastAppendID] = struct{}{}
return i.lastAppendID
}
func (i *isolation) closeAppend(appendID uint64) {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
delete(i.appendsOpen, appendID)
}
// The transactionID ring buffer.
type txRing struct {
txIDs []uint64
txIDFirst int // Position of the first id in the ring.
txIDCount int // How many ids in the ring.
}
func newTxRing(cap int) *txRing {
return &txRing{
txIDs: make([]uint64, cap),
}
}
func (txr *txRing) add(appendID uint64) {
if txr.txIDCount == len(txr.txIDs) {
// Ring buffer is full, expand by doubling.
newRing := make([]uint64, txr.txIDCount*2)
idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:])
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
txr.txIDs = newRing
txr.txIDFirst = 0
}
txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
txr.txIDCount++
}
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
pos := txr.txIDFirst
for txr.txIDCount > 0 {
if txr.txIDs[pos] < bound {
txr.txIDFirst++
txr.txIDCount--
} else {
break
}
pos++
if pos == len(txr.txIDs) {
pos = 0
}
}
txr.txIDFirst %= len(txr.txIDs)
}
func (txr *txRing) iterator() *txRingIterator {
return &txRingIterator{
pos: txr.txIDFirst,
ids: txr.txIDs,
}
}
// txRingIterator lets you iterate over the ring. It doesn't terminate,
// it DOESN'T terminate.
type txRingIterator struct {
ids []uint64
pos int
}
func (it *txRingIterator) At() uint64 {
return it.ids[it.pos]
}
func (it *txRingIterator) Next() {
it.pos++
if it.pos == len(it.ids) {
it.pos = 0
}
}