Optimise lowWatermark in Isolation (#7332)

* Track open appenders in doubly-linked list to make lowWatermark O(1).
* Use RW locks.
* Added BenchmarkIsolationWithState.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
This commit is contained in:
Peter Štibraný 2020-06-03 20:09:05 +02:00 committed by GitHub
parent 5e9bd17b1f
commit ff80690a6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 171 additions and 33 deletions

View file

@ -255,9 +255,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_isolation_high_watermark", Name: "prometheus_tsdb_isolation_high_watermark",
Help: "The highest TSDB append ID that has been given out.", Help: "The highest TSDB append ID that has been given out.",
}, func() float64 { }, func() float64 {
h.iso.appendMtx.Lock() return float64(h.iso.lastAppendID())
defer h.iso.appendMtx.Unlock()
return float64(h.iso.lastAppendID)
}), }),
) )
} }

View file

@ -1677,7 +1677,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") testutil.Equals(t, hb.iso.lastAppendID(), hb.iso.lowWatermark(), "High watermark should be equal to the low watermark")
} }
func TestOutOfOrderSamplesMetric(t *testing.T) { func TestOutOfOrderSamplesMetric(t *testing.T) {

View file

@ -38,17 +38,28 @@ func (i *isolationState) Close() {
i.prev.next = i.next i.prev.next = i.next
} }
type isolationAppender struct {
appendID uint64
prev *isolationAppender
next *isolationAppender
}
// isolation is the global isolation state. // isolation is the global isolation state.
type isolation struct { type isolation struct {
// Mutex for accessing lastAppendID and appendsOpen. // Mutex for accessing lastAppendID and appendsOpen.
appendMtx sync.Mutex appendMtx sync.RWMutex
// Each append is given an internal id.
lastAppendID uint64
// Which appends are currently in progress. // Which appends are currently in progress.
appendsOpen map[uint64]struct{} appendsOpen map[uint64]*isolationAppender
// New appenders with higher appendID are added to the end. First element keeps lastAppendId.
// appendsOpenList.next points to the first element and appendsOpenList.prev points to the last element.
// If there are no appenders, both point back to appendsOpenList.
appendsOpenList *isolationAppender
// Pool of reusable *isolationAppender to save on allocations.
appendersPool sync.Pool
// Mutex for accessing readsOpen. // Mutex for accessing readsOpen.
// If taking both appendMtx and readMtx, take appendMtx first. // If taking both appendMtx and readMtx, take appendMtx first.
readMtx sync.Mutex readMtx sync.RWMutex
// All current in use isolationStates. This is a doubly-linked list. // All current in use isolationStates. This is a doubly-linked list.
readsOpen *isolationState readsOpen *isolationState
} }
@ -58,47 +69,46 @@ func newIsolation() *isolation {
isoState.next = isoState isoState.next = isoState
isoState.prev = isoState isoState.prev = isoState
appender := &isolationAppender{}
appender.next = appender
appender.prev = appender
return &isolation{ return &isolation{
appendsOpen: map[uint64]struct{}{}, appendsOpen: map[uint64]*isolationAppender{},
readsOpen: isoState, appendsOpenList: appender,
readsOpen: isoState,
appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }},
} }
} }
// lowWatermark returns the appendID below which we no longer need to track // lowWatermark returns the appendID below which we no longer need to track
// which appends were from which appendID. // which appends were from which appendID.
func (i *isolation) lowWatermark() uint64 { func (i *isolation) lowWatermark() uint64 {
i.appendMtx.Lock() // Take appendMtx first. i.appendMtx.RLock() // Take appendMtx first.
defer i.appendMtx.Unlock() defer i.appendMtx.RUnlock()
i.readMtx.Lock() i.readMtx.RLock()
defer i.readMtx.Unlock() defer i.readMtx.RUnlock()
if i.readsOpen.prev != i.readsOpen { if i.readsOpen.prev != i.readsOpen {
return i.readsOpen.prev.lowWatermark return i.readsOpen.prev.lowWatermark
} }
lw := i.lastAppendID
for k := range i.appendsOpen { // Lowest appendID from appenders, or lastAppendId.
if k < lw { return i.appendsOpenList.next.appendID
lw = k
}
}
return lw
} }
// State returns an object used to control isolation // State returns an object used to control isolation
// between a query and appends. Must be closed when complete. // between a query and appends. Must be closed when complete.
func (i *isolation) State() *isolationState { func (i *isolation) State() *isolationState {
i.appendMtx.Lock() // Take append mutex before read mutex. i.appendMtx.RLock() // Take append mutex before read mutex.
defer i.appendMtx.Unlock() defer i.appendMtx.RUnlock()
isoState := &isolationState{ isoState := &isolationState{
maxAppendID: i.lastAppendID, maxAppendID: i.appendsOpenList.appendID,
lowWatermark: i.lastAppendID, lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)), incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
isolation: i, isolation: i,
} }
for k := range i.appendsOpen { for k := range i.appendsOpen {
isoState.incompleteAppends[k] = struct{}{} isoState.incompleteAppends[k] = struct{}{}
if k < isoState.lowWatermark {
isoState.lowWatermark = k
}
} }
i.readMtx.Lock() i.readMtx.Lock()
@ -115,15 +125,44 @@ func (i *isolation) State() *isolationState {
func (i *isolation) newAppendID() uint64 { func (i *isolation) newAppendID() uint64 {
i.appendMtx.Lock() i.appendMtx.Lock()
defer i.appendMtx.Unlock() defer i.appendMtx.Unlock()
i.lastAppendID++
i.appendsOpen[i.lastAppendID] = struct{}{} // Last used appendID is stored in head element.
return i.lastAppendID i.appendsOpenList.appendID++
app := i.appendersPool.Get().(*isolationAppender)
app.appendID = i.appendsOpenList.appendID
app.prev = i.appendsOpenList.prev
app.next = i.appendsOpenList
i.appendsOpenList.prev.next = app
i.appendsOpenList.prev = app
i.appendsOpen[app.appendID] = app
return app.appendID
}
func (i *isolation) lastAppendID() uint64 {
i.appendMtx.RLock()
defer i.appendMtx.RUnlock()
return i.appendsOpenList.appendID
} }
func (i *isolation) closeAppend(appendID uint64) { func (i *isolation) closeAppend(appendID uint64) {
i.appendMtx.Lock() i.appendMtx.Lock()
defer i.appendMtx.Unlock() defer i.appendMtx.Unlock()
delete(i.appendsOpen, appendID)
app := i.appendsOpen[appendID]
if app != nil {
app.prev.next = app.next
app.next.prev = app.prev
delete(i.appendsOpen, appendID)
// Clear all fields, and return to the pool.
*app = isolationAppender{}
i.appendersPool.Put(app)
}
} }
// The transactionID ring buffer. // The transactionID ring buffer.

101
tsdb/isolation_test.go Normal file
View file

@ -0,0 +1,101 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"strconv"
"sync"
"testing"
)
func BenchmarkIsolation(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
iso := newIsolation()
wg := sync.WaitGroup{}
start := make(chan struct{})
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < b.N; i++ {
appendID := iso.newAppendID()
_ = iso.lowWatermark()
iso.closeAppend(appendID)
}
}()
}
b.ResetTimer()
close(start)
wg.Wait()
})
}
}
func BenchmarkIsolationWithState(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
iso := newIsolation()
wg := sync.WaitGroup{}
start := make(chan struct{})
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < b.N; i++ {
appendID := iso.newAppendID()
_ = iso.lowWatermark()
iso.closeAppend(appendID)
}
}()
}
readers := goroutines / 100
if readers == 0 {
readers++
}
for g := 0; g < readers; g++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < b.N; i++ {
s := iso.State()
s.Close()
}
}()
}
b.ResetTimer()
close(start)
wg.Wait()
})
}
}