diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index f30ff96200..04c9f5fc39 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -93,6 +93,8 @@ type stripeSeries struct { series []map[chunks.HeadSeriesRef]*memSeries hashes []seriesHashmap locks []stripeLock + + gcMut sync.Mutex } type stripeLock struct { @@ -120,8 +122,14 @@ func newStripeSeries(stripeSize int) *stripeSeries { // GC garbage collects old series that have not received a sample after mint // and will fully delete them. func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { - deleted := map[chunks.HeadSeriesRef]struct{}{} + // NOTE(rfratto): GC will grab two locks, one for the hash and the other for + // series. It's not valid for any other function to grab both locks, + // otherwise a deadlock might occur when running GC in parallel with + // appending. + s.gcMut.Lock() + defer s.gcMut.Unlock() + deleted := map[chunks.HeadSeriesRef]struct{}{} for hashLock := 0; hashLock < s.size; hashLock++ { s.locks[hashLock].Lock() @@ -179,14 +187,17 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) { hashLock = hash & uint64(s.size-1) refLock = uint64(series.ref) & uint64(s.size-1) ) - s.locks[hashLock].Lock() - defer s.locks[hashLock].Unlock() - if hashLock != refLock { - s.locks[refLock].Lock() - defer s.locks[refLock].Unlock() - } - - s.hashes[hashLock].Set(hash, series) + // We can't hold both locks at once otherwise we might deadlock with a + // simulatenous call to GC. + // + // We update s.series first because GC expects anything in s.hashes to + // already exist in s.series. + s.locks[refLock].Lock() s.series[refLock][series.ref] = series + s.locks[refLock].Unlock() + + s.locks[hashLock].Lock() + s.hashes[hashLock].Set(hash, series) + s.locks[hashLock].Unlock() } diff --git a/tsdb/agent/series_test.go b/tsdb/agent/series_test.go new file mode 100644 index 0000000000..480f6c0263 --- /dev/null +++ b/tsdb/agent/series_test.go @@ -0,0 +1,76 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "fmt" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" +) + +func TestNoDeadlock(t *testing.T) { + const numWorkers = 1000 + + var ( + wg sync.WaitGroup + started = make(chan struct{}) + stripeSeries = newStripeSeries(3) + ) + + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + <-started + _ = stripeSeries.GC(math.MaxInt64) + }() + } + + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func(i int) { + defer wg.Done() + <-started + + series := &memSeries{ + ref: chunks.HeadSeriesRef(i), + lset: labels.FromMap(map[string]string{ + "id": fmt.Sprintf("%d", i), + }), + } + stripeSeries.Set(series.lset.Hash(), series) + }(i) + } + + finished := make(chan struct{}) + go func() { + wg.Wait() + close(finished) + }() + + close(started) + select { + case <-finished: + return + case <-time.After(15 * time.Second): + require.FailNow(t, "deadlock detected") + } +}