diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 89cc4d0aed..2b161b45e4 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -78,7 +78,7 @@ func (p *TargetPool) Run() { } } -func (p TargetPool) Stop() { +func (p *TargetPool) Stop() { stopped := make(chan bool) p.done <- stopped <-stopped diff --git a/rules/helpers_test.go b/rules/helpers_test.go index 69836978d3..72bfeb36c9 100644 --- a/rules/helpers_test.go +++ b/rules/helpers_test.go @@ -52,7 +52,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { return vector } -func storeMatrix(storage storage_ng.Storage, matrix ast.Matrix) { +func storeMatrix(storage local.Storage, matrix ast.Matrix) { pendingSamples := clientmodel.Samples{} for _, sampleSet := range matrix { for _, sample := range sampleSet.Values { diff --git a/rules/rules_test.go b/rules/rules_test.go index 083f6dd094..d5e60e9716 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -52,8 +52,8 @@ func vectorComparisonString(expected []string, actual []string) string { separator) } -func newTestStorage(t testing.TB) (storage storage_ng.Storage, closer test.Closer) { - storage, closer = storage_ng.NewTestStorage(t) +func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) { + storage, closer = local.NewTestStorage(t) storeMatrix(storage, testMatrix) return storage, closer } diff --git a/storage/local/locker.go b/storage/local/locker.go index bbac094e05..c0ac876543 100644 --- a/storage/local/locker.go +++ b/storage/local/locker.go @@ -6,93 +6,38 @@ import ( clientmodel "github.com/prometheus/client_golang/model" ) -// fingerprintLock allows locking exactly one fingerprint. When refCount is 0 -// after the mutex is unlocked, the fingerprintLock is discarded from the -// fingerprintLocker. -type fingerprintLock struct { - sync.Mutex - refCount int -} - -// fingerprintLocker allows locking individual fingerprints in such a manner -// that the lock only exists and uses memory while it is being held (or waiting -// to be acquired) by at least one party. -// -// TODO: This could be implemented as just a fixed number n of locks, assigned -// based on the fingerprint % n. There can be collisons, but they would -// statistically rarely matter (if n is much larger than the number of -// goroutines requiring locks concurrently). Only problem is locking of two -// different fingerprints by the same goroutine. +// fingerprintLocker allows locking individual fingerprints. To limit the number +// of mutexes needed for that, only a fixed number of mutexes are +// allocated. Fingerprints to be locked are assigned to those pre-allocated +// mutexes by their value. (Note that fingerprints are calculated by a hash +// function, so that an approximately equal distribution over the mutexes is +// expected, even without additional hashing of the fingerprint value.) +// Collisions are not detected. If two fingerprints get assigned to the same +// mutex, only one of them can be locked at the same time. As long as the number +// of pre-allocated mutexes is much larger than the number of goroutines +// requiring a fingerprint lock concurrently, the loss in efficiency is +// small. However, a goroutine must never lock more than one fingerprint at the +// same time. (In that case a collision would try to acquire the same mutex +// twice). type fingerprintLocker struct { - mtx sync.Mutex - fpLocks map[clientmodel.Fingerprint]*fingerprintLock - fpLockPool []*fingerprintLock + fpMtxs []sync.Mutex + numFpMtxs uint } // newFingerprintLocker returns a new fingerprintLocker ready for use. func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker { - lockPool := make([]*fingerprintLock, preallocatedMutexes) - for i := range lockPool { - lockPool[i] = &fingerprintLock{} - } return &fingerprintLocker{ - fpLocks: map[clientmodel.Fingerprint]*fingerprintLock{}, - fpLockPool: lockPool, + make([]sync.Mutex, preallocatedMutexes), + uint(preallocatedMutexes), } } -// getLock either returns an existing fingerprintLock from a pool, or allocates -// a new one if the pool is depleted. -func (l *fingerprintLocker) getLock() *fingerprintLock { - return l.fpLockPool[0] - if len(l.fpLockPool) == 0 { - return &fingerprintLock{} - } - - lock := l.fpLockPool[len(l.fpLockPool)-1] - l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)-1] - return lock -} - -// putLock either stores a fingerprintLock back in the pool, or throws it away -// if the pool is full. -func (l *fingerprintLocker) putLock(fpl *fingerprintLock) { - if len(l.fpLockPool) == cap(l.fpLockPool) { - return - } - - l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)+1] - l.fpLockPool[len(l.fpLockPool)-1] = fpl -} - // Lock locks the given fingerprint. func (l *fingerprintLocker) Lock(fp clientmodel.Fingerprint) { - l.mtx.Lock() - - fpLock, ok := l.fpLocks[fp] - if ok { - fpLock.refCount++ - } else { - fpLock = l.getLock() - l.fpLocks[fp] = fpLock - } - - l.mtx.Unlock() - fpLock.Lock() + l.fpMtxs[uint(fp)%l.numFpMtxs].Lock() } // Unlock unlocks the given fingerprint. func (l *fingerprintLocker) Unlock(fp clientmodel.Fingerprint) { - l.mtx.Lock() - defer l.mtx.Unlock() - - fpLock := l.fpLocks[fp] - fpLock.Unlock() - - if fpLock.refCount == 0 { - delete(l.fpLocks, fp) - l.putLock(fpLock) - } else { - fpLock.refCount-- - } + l.fpMtxs[uint(fp)%l.numFpMtxs].Unlock() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 830dd94349..83e48e4170 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -358,7 +358,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie // - Make sure the length of the seriesMap doesn't change during the runtime. // - Lock the fingerprints while persisting unpersisted head chunks. // - Write to temporary file and only rename after successfully finishing. -func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) error { +func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) error { f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { return err @@ -431,54 +431,54 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) er // open (non-full) head chunks. Only call this method during start-up while // nothing else is running in storage land. This method is utterly // goroutine-unsafe. -func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) { +func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { f, err := os.Open(p.headsPath()) if os.IsNotExist(err) { return newSeriesMap(), nil } if err != nil { - return seriesMap{}, err + return nil, err } defer f.Close() r := bufio.NewReaderSize(f, fileBufSize) buf := make([]byte, len(headsMagicString)) if _, err := io.ReadFull(r, buf); err != nil { - return seriesMap{}, err + return nil, err } magic := string(buf) if magic != headsMagicString { - return seriesMap{}, fmt.Errorf( + return nil, fmt.Errorf( "unexpected magic string, want %q, got %q", headsMagicString, magic, ) } if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil { - return seriesMap{}, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) + return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) } numSeries, err := binary.ReadVarint(r) if err != nil { - return seriesMap{}, err + return nil, err } fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries) for ; numSeries > 0; numSeries-- { seriesFlags, err := r.ReadByte() if err != nil { - return seriesMap{}, err + return nil, err } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 fp, err := codable.DecodeUint64(r) if err != nil { - return seriesMap{}, err + return nil, err } var metric codable.Metric if err := metric.UnmarshalFromReader(r); err != nil { - return seriesMap{}, err + return nil, err } numChunkDescs, err := binary.ReadVarint(r) if err != nil { - return seriesMap{}, err + return nil, err } chunkDescs := make(chunkDescs, numChunkDescs) @@ -486,11 +486,11 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) { if headChunkPersisted || i < numChunkDescs-1 { firstTime, err := binary.ReadVarint(r) if err != nil { - return seriesMap{}, err + return nil, err } lastTime, err := binary.ReadVarint(r) if err != nil { - return seriesMap{}, err + return nil, err } chunkDescs[i] = &chunkDesc{ firstTimeField: clientmodel.Timestamp(firstTime), @@ -500,11 +500,11 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) { // Non-persisted head chunk. chunkType, err := r.ReadByte() if err != nil { - return seriesMap{}, err + return nil, err } chunk := chunkForType(chunkType) if err := chunk.unmarshal(r); err != nil { - return seriesMap{}, err + return nil, err } chunkDescs[i] = &chunkDesc{ chunk: chunk, @@ -520,7 +520,7 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) { headChunkPersisted: headChunkPersisted, } } - return seriesMap{m: fingerprintToSeries}, nil + return &seriesMap{m: fingerprintToSeries}, nil } // dropChunks deletes all chunks from a series whose last sample time is before diff --git a/storage/local/series.go b/storage/local/series.go index 4cc64d6e92..efc10e54e4 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -38,12 +38,12 @@ type seriesMap struct { // newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap // based on a prefilled map, use an explicit initializer. -func newSeriesMap() seriesMap { - return seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)} +func newSeriesMap() *seriesMap { + return &seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)} } // length returns the number of mappings in the seriesMap. -func (sm seriesMap) length() int { +func (sm *seriesMap) length() int { sm.mtx.RLock() defer sm.mtx.RUnlock() @@ -52,7 +52,7 @@ func (sm seriesMap) length() int { // get returns a memorySeries for a fingerprint. Return values have the same // semantics as the native Go map. -func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) { +func (sm *seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) { sm.mtx.RLock() defer sm.mtx.RUnlock() @@ -61,7 +61,7 @@ func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) { } // put adds a mapping to the seriesMap. -func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) { +func (sm *seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) { sm.mtx.Lock() defer sm.mtx.Unlock() @@ -69,7 +69,7 @@ func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) { } // del removes a mapping from the series Map. -func (sm seriesMap) del(fp clientmodel.Fingerprint) { +func (sm *seriesMap) del(fp clientmodel.Fingerprint) { sm.mtx.Lock() defer sm.mtx.Unlock() @@ -83,7 +83,7 @@ func (sm seriesMap) del(fp clientmodel.Fingerprint) { // for iterating over a map with a 'range' clause. However, if the next element // in iteration order is removed after the current element has been received // from the channel, it will still be produced by the channel. -func (sm seriesMap) iter() <-chan fingerprintSeriesPair { +func (sm *seriesMap) iter() <-chan fingerprintSeriesPair { ch := make(chan fingerprintSeriesPair) go func() { sm.mtx.RLock() @@ -105,7 +105,7 @@ func (sm seriesMap) iter() <-chan fingerprintSeriesPair { // for iterating over a map with a 'range' clause. However, if the next element // in iteration order is removed after the current element has been received // from the channel, it will still be produced by the channel. -func (sm seriesMap) fpIter() <-chan clientmodel.Fingerprint { +func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint { ch := make(chan clientmodel.Fingerprint) go func() { sm.mtx.RLock() diff --git a/storage/local/storage.go b/storage/local/storage.go index 6b5d031dac..738dbbf612 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -41,7 +41,7 @@ type memorySeriesStorage struct { persistDone chan bool stopServing chan chan<- bool - fingerprintToSeries seriesMap + fingerprintToSeries *seriesMap memoryEvictionInterval time.Duration memoryRetentionPeriod time.Duration diff --git a/templates/templates_test.go b/templates/templates_test.go index 0b20e93b7c..ae817aa143 100644 --- a/templates/templates_test.go +++ b/templates/templates_test.go @@ -152,7 +152,7 @@ func TestTemplateExpansion(t *testing.T) { time := clientmodel.Timestamp(0) - storage, closer := storage_ng.NewTestStorage(t) + storage, closer := local.NewTestStorage(t) defer closer.Close() storage.AppendSamples(clientmodel.Samples{ {