mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Fix and improve the fp locker.
Benchmark: $ go test -bench 'Fingerprint' -test.run 'Fingerprint' -test.cpu=1,2,4 OLD BenchmarkFingerprintLockerParallel 500000 3618 ns/op BenchmarkFingerprintLockerParallel-2 100000 12257 ns/op BenchmarkFingerprintLockerParallel-4 500000 10164 ns/op BenchmarkFingerprintLockerSerial 10000000 283 ns/op BenchmarkFingerprintLockerSerial-2 10000000 284 ns/op BenchmarkFingerprintLockerSerial-4 10000000 288 ns/op NEW BenchmarkFingerprintLockerParallel 1000000 1018 ns/op BenchmarkFingerprintLockerParallel-2 1000000 1164 ns/op BenchmarkFingerprintLockerParallel-4 2000000 910 ns/op BenchmarkFingerprintLockerSerial 50000000 56.0 ns/op BenchmarkFingerprintLockerSerial-2 50000000 47.9 ns/op BenchmarkFingerprintLockerSerial-4 50000000 54.5 ns/op Change-Id: I3c65a43822840e7e64c3c3cfe759e1de51272581
This commit is contained in:
parent
7ad55ef83c
commit
7f5d3c2c29
|
@ -78,7 +78,7 @@ func (p *TargetPool) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (p TargetPool) Stop() {
|
||||
func (p *TargetPool) Stop() {
|
||||
stopped := make(chan bool)
|
||||
p.done <- stopped
|
||||
<-stopped
|
||||
|
|
|
@ -52,7 +52,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
|
|||
return vector
|
||||
}
|
||||
|
||||
func storeMatrix(storage storage_ng.Storage, matrix ast.Matrix) {
|
||||
func storeMatrix(storage local.Storage, matrix ast.Matrix) {
|
||||
pendingSamples := clientmodel.Samples{}
|
||||
for _, sampleSet := range matrix {
|
||||
for _, sample := range sampleSet.Values {
|
||||
|
|
|
@ -52,8 +52,8 @@ func vectorComparisonString(expected []string, actual []string) string {
|
|||
separator)
|
||||
}
|
||||
|
||||
func newTestStorage(t testing.TB) (storage storage_ng.Storage, closer test.Closer) {
|
||||
storage, closer = storage_ng.NewTestStorage(t)
|
||||
func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) {
|
||||
storage, closer = local.NewTestStorage(t)
|
||||
storeMatrix(storage, testMatrix)
|
||||
return storage, closer
|
||||
}
|
||||
|
|
|
@ -6,93 +6,38 @@ import (
|
|||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
// fingerprintLock allows locking exactly one fingerprint. When refCount is 0
|
||||
// after the mutex is unlocked, the fingerprintLock is discarded from the
|
||||
// fingerprintLocker.
|
||||
type fingerprintLock struct {
|
||||
sync.Mutex
|
||||
refCount int
|
||||
}
|
||||
|
||||
// fingerprintLocker allows locking individual fingerprints in such a manner
|
||||
// that the lock only exists and uses memory while it is being held (or waiting
|
||||
// to be acquired) by at least one party.
|
||||
//
|
||||
// TODO: This could be implemented as just a fixed number n of locks, assigned
|
||||
// based on the fingerprint % n. There can be collisons, but they would
|
||||
// statistically rarely matter (if n is much larger than the number of
|
||||
// goroutines requiring locks concurrently). Only problem is locking of two
|
||||
// different fingerprints by the same goroutine.
|
||||
// fingerprintLocker allows locking individual fingerprints. To limit the number
|
||||
// of mutexes needed for that, only a fixed number of mutexes are
|
||||
// allocated. Fingerprints to be locked are assigned to those pre-allocated
|
||||
// mutexes by their value. (Note that fingerprints are calculated by a hash
|
||||
// function, so that an approximately equal distribution over the mutexes is
|
||||
// expected, even without additional hashing of the fingerprint value.)
|
||||
// Collisions are not detected. If two fingerprints get assigned to the same
|
||||
// mutex, only one of them can be locked at the same time. As long as the number
|
||||
// of pre-allocated mutexes is much larger than the number of goroutines
|
||||
// requiring a fingerprint lock concurrently, the loss in efficiency is
|
||||
// small. However, a goroutine must never lock more than one fingerprint at the
|
||||
// same time. (In that case a collision would try to acquire the same mutex
|
||||
// twice).
|
||||
type fingerprintLocker struct {
|
||||
mtx sync.Mutex
|
||||
fpLocks map[clientmodel.Fingerprint]*fingerprintLock
|
||||
fpLockPool []*fingerprintLock
|
||||
fpMtxs []sync.Mutex
|
||||
numFpMtxs uint
|
||||
}
|
||||
|
||||
// newFingerprintLocker returns a new fingerprintLocker ready for use.
|
||||
func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker {
|
||||
lockPool := make([]*fingerprintLock, preallocatedMutexes)
|
||||
for i := range lockPool {
|
||||
lockPool[i] = &fingerprintLock{}
|
||||
}
|
||||
return &fingerprintLocker{
|
||||
fpLocks: map[clientmodel.Fingerprint]*fingerprintLock{},
|
||||
fpLockPool: lockPool,
|
||||
make([]sync.Mutex, preallocatedMutexes),
|
||||
uint(preallocatedMutexes),
|
||||
}
|
||||
}
|
||||
|
||||
// getLock either returns an existing fingerprintLock from a pool, or allocates
|
||||
// a new one if the pool is depleted.
|
||||
func (l *fingerprintLocker) getLock() *fingerprintLock {
|
||||
return l.fpLockPool[0]
|
||||
if len(l.fpLockPool) == 0 {
|
||||
return &fingerprintLock{}
|
||||
}
|
||||
|
||||
lock := l.fpLockPool[len(l.fpLockPool)-1]
|
||||
l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)-1]
|
||||
return lock
|
||||
}
|
||||
|
||||
// putLock either stores a fingerprintLock back in the pool, or throws it away
|
||||
// if the pool is full.
|
||||
func (l *fingerprintLocker) putLock(fpl *fingerprintLock) {
|
||||
if len(l.fpLockPool) == cap(l.fpLockPool) {
|
||||
return
|
||||
}
|
||||
|
||||
l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)+1]
|
||||
l.fpLockPool[len(l.fpLockPool)-1] = fpl
|
||||
}
|
||||
|
||||
// Lock locks the given fingerprint.
|
||||
func (l *fingerprintLocker) Lock(fp clientmodel.Fingerprint) {
|
||||
l.mtx.Lock()
|
||||
|
||||
fpLock, ok := l.fpLocks[fp]
|
||||
if ok {
|
||||
fpLock.refCount++
|
||||
} else {
|
||||
fpLock = l.getLock()
|
||||
l.fpLocks[fp] = fpLock
|
||||
}
|
||||
|
||||
l.mtx.Unlock()
|
||||
fpLock.Lock()
|
||||
l.fpMtxs[uint(fp)%l.numFpMtxs].Lock()
|
||||
}
|
||||
|
||||
// Unlock unlocks the given fingerprint.
|
||||
func (l *fingerprintLocker) Unlock(fp clientmodel.Fingerprint) {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
|
||||
fpLock := l.fpLocks[fp]
|
||||
fpLock.Unlock()
|
||||
|
||||
if fpLock.refCount == 0 {
|
||||
delete(l.fpLocks, fp)
|
||||
l.putLock(fpLock)
|
||||
} else {
|
||||
fpLock.refCount--
|
||||
}
|
||||
l.fpMtxs[uint(fp)%l.numFpMtxs].Unlock()
|
||||
}
|
||||
|
|
|
@ -358,7 +358,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
|
|||
// - Make sure the length of the seriesMap doesn't change during the runtime.
|
||||
// - Lock the fingerprints while persisting unpersisted head chunks.
|
||||
// - Write to temporary file and only rename after successfully finishing.
|
||||
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) error {
|
||||
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) error {
|
||||
f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -431,54 +431,54 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) er
|
|||
// open (non-full) head chunks. Only call this method during start-up while
|
||||
// nothing else is running in storage land. This method is utterly
|
||||
// goroutine-unsafe.
|
||||
func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
|
||||
func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
|
||||
f, err := os.Open(p.headsPath())
|
||||
if os.IsNotExist(err) {
|
||||
return newSeriesMap(), nil
|
||||
}
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
r := bufio.NewReaderSize(f, fileBufSize)
|
||||
|
||||
buf := make([]byte, len(headsMagicString))
|
||||
if _, err := io.ReadFull(r, buf); err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
magic := string(buf)
|
||||
if magic != headsMagicString {
|
||||
return seriesMap{}, fmt.Errorf(
|
||||
return nil, fmt.Errorf(
|
||||
"unexpected magic string, want %q, got %q",
|
||||
headsMagicString, magic,
|
||||
)
|
||||
}
|
||||
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
|
||||
return seriesMap{}, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
|
||||
return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
|
||||
}
|
||||
numSeries, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries)
|
||||
|
||||
for ; numSeries > 0; numSeries-- {
|
||||
seriesFlags, err := r.ReadByte()
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
||||
fp, err := codable.DecodeUint64(r)
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
var metric codable.Metric
|
||||
if err := metric.UnmarshalFromReader(r); err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
numChunkDescs, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
chunkDescs := make(chunkDescs, numChunkDescs)
|
||||
|
||||
|
@ -486,11 +486,11 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
|
|||
if headChunkPersisted || i < numChunkDescs-1 {
|
||||
firstTime, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
lastTime, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
chunkDescs[i] = &chunkDesc{
|
||||
firstTimeField: clientmodel.Timestamp(firstTime),
|
||||
|
@ -500,11 +500,11 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
|
|||
// Non-persisted head chunk.
|
||||
chunkType, err := r.ReadByte()
|
||||
if err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
chunk := chunkForType(chunkType)
|
||||
if err := chunk.unmarshal(r); err != nil {
|
||||
return seriesMap{}, err
|
||||
return nil, err
|
||||
}
|
||||
chunkDescs[i] = &chunkDesc{
|
||||
chunk: chunk,
|
||||
|
@ -520,7 +520,7 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
|
|||
headChunkPersisted: headChunkPersisted,
|
||||
}
|
||||
}
|
||||
return seriesMap{m: fingerprintToSeries}, nil
|
||||
return &seriesMap{m: fingerprintToSeries}, nil
|
||||
}
|
||||
|
||||
// dropChunks deletes all chunks from a series whose last sample time is before
|
||||
|
|
|
@ -38,12 +38,12 @@ type seriesMap struct {
|
|||
|
||||
// newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap
|
||||
// based on a prefilled map, use an explicit initializer.
|
||||
func newSeriesMap() seriesMap {
|
||||
return seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
|
||||
func newSeriesMap() *seriesMap {
|
||||
return &seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
|
||||
}
|
||||
|
||||
// length returns the number of mappings in the seriesMap.
|
||||
func (sm seriesMap) length() int {
|
||||
func (sm *seriesMap) length() int {
|
||||
sm.mtx.RLock()
|
||||
defer sm.mtx.RUnlock()
|
||||
|
||||
|
@ -52,7 +52,7 @@ func (sm seriesMap) length() int {
|
|||
|
||||
// get returns a memorySeries for a fingerprint. Return values have the same
|
||||
// semantics as the native Go map.
|
||||
func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
|
||||
func (sm *seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
|
||||
sm.mtx.RLock()
|
||||
defer sm.mtx.RUnlock()
|
||||
|
||||
|
@ -61,7 +61,7 @@ func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
|
|||
}
|
||||
|
||||
// put adds a mapping to the seriesMap.
|
||||
func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
|
||||
func (sm *seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
|
||||
sm.mtx.Lock()
|
||||
defer sm.mtx.Unlock()
|
||||
|
||||
|
@ -69,7 +69,7 @@ func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
|
|||
}
|
||||
|
||||
// del removes a mapping from the series Map.
|
||||
func (sm seriesMap) del(fp clientmodel.Fingerprint) {
|
||||
func (sm *seriesMap) del(fp clientmodel.Fingerprint) {
|
||||
sm.mtx.Lock()
|
||||
defer sm.mtx.Unlock()
|
||||
|
||||
|
@ -83,7 +83,7 @@ func (sm seriesMap) del(fp clientmodel.Fingerprint) {
|
|||
// for iterating over a map with a 'range' clause. However, if the next element
|
||||
// in iteration order is removed after the current element has been received
|
||||
// from the channel, it will still be produced by the channel.
|
||||
func (sm seriesMap) iter() <-chan fingerprintSeriesPair {
|
||||
func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
|
||||
ch := make(chan fingerprintSeriesPair)
|
||||
go func() {
|
||||
sm.mtx.RLock()
|
||||
|
@ -105,7 +105,7 @@ func (sm seriesMap) iter() <-chan fingerprintSeriesPair {
|
|||
// for iterating over a map with a 'range' clause. However, if the next element
|
||||
// in iteration order is removed after the current element has been received
|
||||
// from the channel, it will still be produced by the channel.
|
||||
func (sm seriesMap) fpIter() <-chan clientmodel.Fingerprint {
|
||||
func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint {
|
||||
ch := make(chan clientmodel.Fingerprint)
|
||||
go func() {
|
||||
sm.mtx.RLock()
|
||||
|
|
|
@ -41,7 +41,7 @@ type memorySeriesStorage struct {
|
|||
persistDone chan bool
|
||||
stopServing chan chan<- bool
|
||||
|
||||
fingerprintToSeries seriesMap
|
||||
fingerprintToSeries *seriesMap
|
||||
|
||||
memoryEvictionInterval time.Duration
|
||||
memoryRetentionPeriod time.Duration
|
||||
|
|
|
@ -152,7 +152,7 @@ func TestTemplateExpansion(t *testing.T) {
|
|||
|
||||
time := clientmodel.Timestamp(0)
|
||||
|
||||
storage, closer := storage_ng.NewTestStorage(t)
|
||||
storage, closer := local.NewTestStorage(t)
|
||||
defer closer.Close()
|
||||
storage.AppendSamples(clientmodel.Samples{
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue