From 7e6a03fbf9f6f5dcf98884fd98d2b1f8d4398692 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Mon, 6 Oct 2014 15:58:12 +0200 Subject: [PATCH] Fix a few concurrency issues before starting to use the new fp locker. Change-Id: I8615e8816e79ef0882e123163ee590c739b79d12 --- storage/local/interface.go | 6 +++++ storage/local/locker.go | 10 +++++++-- storage/local/locker_test.go | 6 ++--- storage/local/persistence.go | 7 +++--- storage/local/series.go | 18 +++++++++++++-- storage/local/storage.go | 43 ++++++++++++++++++++++++++---------- 6 files changed, 67 insertions(+), 23 deletions(-) diff --git a/storage/local/interface.go b/storage/local/interface.go index de053c2aca..56cc2b51a8 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -21,6 +21,12 @@ import ( // SeriesMap maps fingerprints to memory series. type SeriesMap map[clientmodel.Fingerprint]*memorySeries +// FingerprintSeriesPair is a fingerprint paired with a memory series. +type FingerprintSeriesPair struct { + Fingerprint clientmodel.Fingerprint + Series *memorySeries +} + // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. type Storage interface { diff --git a/storage/local/locker.go b/storage/local/locker.go index 23e5e4849d..bbac094e05 100644 --- a/storage/local/locker.go +++ b/storage/local/locker.go @@ -17,6 +17,12 @@ type fingerprintLock struct { // fingerprintLocker allows locking individual fingerprints in such a manner // that the lock only exists and uses memory while it is being held (or waiting // to be acquired) by at least one party. +// +// TODO: This could be implemented as just a fixed number n of locks, assigned +// based on the fingerprint % n. There can be collisons, but they would +// statistically rarely matter (if n is much larger than the number of +// goroutines requiring locks concurrently). Only problem is locking of two +// different fingerprints by the same goroutine. type fingerprintLocker struct { mtx sync.Mutex fpLocks map[clientmodel.Fingerprint]*fingerprintLock @@ -24,8 +30,8 @@ type fingerprintLocker struct { } // newFingerprintLocker returns a new fingerprintLocker ready for use. -func newFingerprintLocker() *fingerprintLocker { - lockPool := make([]*fingerprintLock, 100) +func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker { + lockPool := make([]*fingerprintLock, preallocatedMutexes) for i := range lockPool { lockPool[i] = &fingerprintLock{} } diff --git a/storage/local/locker_test.go b/storage/local/locker_test.go index 57cf50208e..0a025f2f6a 100644 --- a/storage/local/locker_test.go +++ b/storage/local/locker_test.go @@ -7,13 +7,11 @@ import ( clientmodel "github.com/prometheus/client_golang/model" ) -var httpServerStarted bool - func BenchmarkFingerprintLockerParallel(b *testing.B) { numGoroutines := 10 numFingerprints := 10 numLockOps := b.N - locker := newFingerprintLocker() + locker := newFingerprintLocker(100) wg := sync.WaitGroup{} b.ResetTimer() @@ -36,7 +34,7 @@ func BenchmarkFingerprintLockerParallel(b *testing.B) { func BenchmarkFingerprintLockerSerial(b *testing.B) { numFingerprints := 10 - locker := newFingerprintLocker() + locker := newFingerprintLocker(100) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 128544e6a6..a3e50b14f5 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -50,9 +50,10 @@ const ( chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 - indexingMaxBatchSize = 1024 - indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. - indexingQueueCapacity = 10 * indexingMaxBatchSize // TODO: Export as metric. + // TODO: Consider making any of these configurable? + indexingMaxBatchSize = 1024 * 1024 + indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. + indexingQueueCapacity = 1024 ) const ( diff --git a/storage/local/series.go b/storage/local/series.go index 0d0d438893..4cff464612 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -119,8 +119,9 @@ type memorySeries struct { // (or all) chunkDescs are only on disk. These chunks are all contiguous // and at the tail end. chunkDescsLoaded bool - // Whether the current head chunk has already been persisted. If true, - // the current head chunk must not be modified anymore. + // Whether the current head chunk has already been persisted (or at + // least has been scheduled to be persisted). If true, the current head + // chunk must not be modified anymore. headChunkPersisted bool } @@ -171,6 +172,19 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per } } +func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.headChunkPersisted { + return + } + s.headChunkPersisted = true + persistQueue <- &persistRequest{ + fingerprint: fp, + chunkDesc: s.head(), + } +} + func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/storage/local/storage.go b/storage/local/storage.go index 47f2345c4b..ce4fddbd47 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -201,26 +201,45 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter } func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { - s.mtx.RLock() - defer s.mtx.RUnlock() + fspsToArchive := []FingerprintSeriesPair{} defer func(begin time.Time) { evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) }(time.Now()) + s.mtx.RLock() for fp, series := range s.fingerprintToSeries { if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { - if err := s.persistence.ArchiveMetric( - fp, series.metric, series.firstTime(), series.lastTime(), - ); err != nil { - glog.Errorf("Error archiving metric %v: %v", series.metric, err) - } - delete(s.fingerprintToSeries, fp) - s.persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: series.head(), - } + fspsToArchive = append(fspsToArchive, FingerprintSeriesPair{ + Fingerprint: fp, + Series: series, + }) } + series.persistHeadChunk(fp, s.persistQueue) + } + s.mtx.RUnlock() + + if len(fspsToArchive) == 0 { + return + } + + // If we are here, we have metrics to archive. For that, we need the write lock. + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, fsp := range fspsToArchive { + // TODO: Need series lock (or later FP lock)? + if !fsp.Series.headChunkPersisted { + // Oops. The series has received new samples all of a + // sudden, giving it a new head chunk. Leave it alone. + return + } + if err := s.persistence.ArchiveMetric( + fsp.Fingerprint, fsp.Series.metric, fsp.Series.firstTime(), fsp.Series.lastTime(), + ); err != nil { + glog.Errorf("Error archiving metric %v: %v", fsp.Series.metric, err) + } + delete(s.fingerprintToSeries, fsp.Fingerprint) } }