diff --git a/tsdb/head.go b/tsdb/head.go index 5cbaefc1e..0a16671ea 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -255,9 +255,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_isolation_high_watermark", Help: "The highest TSDB append ID that has been given out.", }, func() float64 { - h.iso.appendMtx.Lock() - defer h.iso.appendMtx.Unlock() - return float64(h.iso.lastAppendID) + return float64(h.iso.lastAppendID()) }), ) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f95c16a20..e717b07b7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1677,7 +1677,7 @@ func TestIsolationWithoutAdd(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") + testutil.Equals(t, hb.iso.lastAppendID(), hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") } func TestOutOfOrderSamplesMetric(t *testing.T) { diff --git a/tsdb/isolation.go b/tsdb/isolation.go index 12a9bfe92..cee6899dd 100644 --- a/tsdb/isolation.go +++ b/tsdb/isolation.go @@ -38,17 +38,28 @@ func (i *isolationState) Close() { i.prev.next = i.next } +type isolationAppender struct { + appendID uint64 + prev *isolationAppender + next *isolationAppender +} + // isolation is the global isolation state. type isolation struct { // Mutex for accessing lastAppendID and appendsOpen. - appendMtx sync.Mutex - // Each append is given an internal id. - lastAppendID uint64 + appendMtx sync.RWMutex // Which appends are currently in progress. - appendsOpen map[uint64]struct{} + appendsOpen map[uint64]*isolationAppender + // New appenders with higher appendID are added to the end. First element keeps lastAppendId. + // appendsOpenList.next points to the first element and appendsOpenList.prev points to the last element. + // If there are no appenders, both point back to appendsOpenList. + appendsOpenList *isolationAppender + // Pool of reusable *isolationAppender to save on allocations. + appendersPool sync.Pool + // Mutex for accessing readsOpen. // If taking both appendMtx and readMtx, take appendMtx first. - readMtx sync.Mutex + readMtx sync.RWMutex // All current in use isolationStates. This is a doubly-linked list. readsOpen *isolationState } @@ -58,47 +69,46 @@ func newIsolation() *isolation { isoState.next = isoState isoState.prev = isoState + appender := &isolationAppender{} + appender.next = appender + appender.prev = appender + return &isolation{ - appendsOpen: map[uint64]struct{}{}, - readsOpen: isoState, + appendsOpen: map[uint64]*isolationAppender{}, + appendsOpenList: appender, + readsOpen: isoState, + appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }}, } } // lowWatermark returns the appendID below which we no longer need to track // which appends were from which appendID. func (i *isolation) lowWatermark() uint64 { - i.appendMtx.Lock() // Take appendMtx first. - defer i.appendMtx.Unlock() - i.readMtx.Lock() - defer i.readMtx.Unlock() + i.appendMtx.RLock() // Take appendMtx first. + defer i.appendMtx.RUnlock() + i.readMtx.RLock() + defer i.readMtx.RUnlock() if i.readsOpen.prev != i.readsOpen { return i.readsOpen.prev.lowWatermark } - lw := i.lastAppendID - for k := range i.appendsOpen { - if k < lw { - lw = k - } - } - return lw + + // Lowest appendID from appenders, or lastAppendId. + return i.appendsOpenList.next.appendID } // State returns an object used to control isolation // between a query and appends. Must be closed when complete. func (i *isolation) State() *isolationState { - i.appendMtx.Lock() // Take append mutex before read mutex. - defer i.appendMtx.Unlock() + i.appendMtx.RLock() // Take append mutex before read mutex. + defer i.appendMtx.RUnlock() isoState := &isolationState{ - maxAppendID: i.lastAppendID, - lowWatermark: i.lastAppendID, + maxAppendID: i.appendsOpenList.appendID, + lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId. incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)), isolation: i, } for k := range i.appendsOpen { isoState.incompleteAppends[k] = struct{}{} - if k < isoState.lowWatermark { - isoState.lowWatermark = k - } } i.readMtx.Lock() @@ -115,15 +125,44 @@ func (i *isolation) State() *isolationState { func (i *isolation) newAppendID() uint64 { i.appendMtx.Lock() defer i.appendMtx.Unlock() - i.lastAppendID++ - i.appendsOpen[i.lastAppendID] = struct{}{} - return i.lastAppendID + + // Last used appendID is stored in head element. + i.appendsOpenList.appendID++ + + app := i.appendersPool.Get().(*isolationAppender) + app.appendID = i.appendsOpenList.appendID + app.prev = i.appendsOpenList.prev + app.next = i.appendsOpenList + + i.appendsOpenList.prev.next = app + i.appendsOpenList.prev = app + + i.appendsOpen[app.appendID] = app + return app.appendID +} + +func (i *isolation) lastAppendID() uint64 { + i.appendMtx.RLock() + defer i.appendMtx.RUnlock() + + return i.appendsOpenList.appendID } func (i *isolation) closeAppend(appendID uint64) { i.appendMtx.Lock() defer i.appendMtx.Unlock() - delete(i.appendsOpen, appendID) + + app := i.appendsOpen[appendID] + if app != nil { + app.prev.next = app.next + app.next.prev = app.prev + + delete(i.appendsOpen, appendID) + + // Clear all fields, and return to the pool. + *app = isolationAppender{} + i.appendersPool.Put(app) + } } // The transactionID ring buffer. diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go new file mode 100644 index 000000000..3a90547b2 --- /dev/null +++ b/tsdb/isolation_test.go @@ -0,0 +1,101 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "strconv" + "sync" + "testing" +) + +func BenchmarkIsolation(b *testing.B) { + for _, goroutines := range []int{10, 100, 1000, 10000} { + b.Run(strconv.Itoa(goroutines), func(b *testing.B) { + iso := newIsolation() + + wg := sync.WaitGroup{} + start := make(chan struct{}) + + for g := 0; g < goroutines; g++ { + wg.Add(1) + + go func() { + defer wg.Done() + <-start + + for i := 0; i < b.N; i++ { + appendID := iso.newAppendID() + _ = iso.lowWatermark() + + iso.closeAppend(appendID) + } + }() + } + + b.ResetTimer() + close(start) + wg.Wait() + }) + } +} + +func BenchmarkIsolationWithState(b *testing.B) { + for _, goroutines := range []int{10, 100, 1000, 10000} { + b.Run(strconv.Itoa(goroutines), func(b *testing.B) { + iso := newIsolation() + + wg := sync.WaitGroup{} + start := make(chan struct{}) + + for g := 0; g < goroutines; g++ { + wg.Add(1) + + go func() { + defer wg.Done() + <-start + + for i := 0; i < b.N; i++ { + appendID := iso.newAppendID() + _ = iso.lowWatermark() + + iso.closeAppend(appendID) + } + }() + } + + readers := goroutines / 100 + if readers == 0 { + readers++ + } + + for g := 0; g < readers; g++ { + wg.Add(1) + + go func() { + defer wg.Done() + <-start + + for i := 0; i < b.N; i++ { + s := iso.State() + s.Close() + } + }() + } + + b.ResetTimer() + close(start) + wg.Wait() + }) + } +}