Fix a few concurrency issues before starting to use the new fp locker.

Change-Id: I8615e8816e79ef0882e123163ee590c739b79d12
This commit is contained in:
Bjoern Rabenstein 2014-10-06 15:58:12 +02:00
parent db92620163
commit 7e6a03fbf9
6 changed files with 67 additions and 23 deletions

View file

@ -21,6 +21,12 @@ import (
// SeriesMap maps fingerprints to memory series. // SeriesMap maps fingerprints to memory series.
type SeriesMap map[clientmodel.Fingerprint]*memorySeries type SeriesMap map[clientmodel.Fingerprint]*memorySeries
// FingerprintSeriesPair is a fingerprint paired with a memory series.
type FingerprintSeriesPair struct {
Fingerprint clientmodel.Fingerprint
Series *memorySeries
}
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. // are goroutine-safe.
type Storage interface { type Storage interface {

View file

@ -17,6 +17,12 @@ type fingerprintLock struct {
// fingerprintLocker allows locking individual fingerprints in such a manner // fingerprintLocker allows locking individual fingerprints in such a manner
// that the lock only exists and uses memory while it is being held (or waiting // that the lock only exists and uses memory while it is being held (or waiting
// to be acquired) by at least one party. // to be acquired) by at least one party.
//
// TODO: This could be implemented as just a fixed number n of locks, assigned
// based on the fingerprint % n. There can be collisons, but they would
// statistically rarely matter (if n is much larger than the number of
// goroutines requiring locks concurrently). Only problem is locking of two
// different fingerprints by the same goroutine.
type fingerprintLocker struct { type fingerprintLocker struct {
mtx sync.Mutex mtx sync.Mutex
fpLocks map[clientmodel.Fingerprint]*fingerprintLock fpLocks map[clientmodel.Fingerprint]*fingerprintLock
@ -24,8 +30,8 @@ type fingerprintLocker struct {
} }
// newFingerprintLocker returns a new fingerprintLocker ready for use. // newFingerprintLocker returns a new fingerprintLocker ready for use.
func newFingerprintLocker() *fingerprintLocker { func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker {
lockPool := make([]*fingerprintLock, 100) lockPool := make([]*fingerprintLock, preallocatedMutexes)
for i := range lockPool { for i := range lockPool {
lockPool[i] = &fingerprintLock{} lockPool[i] = &fingerprintLock{}
} }

View file

@ -7,13 +7,11 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
var httpServerStarted bool
func BenchmarkFingerprintLockerParallel(b *testing.B) { func BenchmarkFingerprintLockerParallel(b *testing.B) {
numGoroutines := 10 numGoroutines := 10
numFingerprints := 10 numFingerprints := 10
numLockOps := b.N numLockOps := b.N
locker := newFingerprintLocker() locker := newFingerprintLocker(100)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
b.ResetTimer() b.ResetTimer()
@ -36,7 +34,7 @@ func BenchmarkFingerprintLockerParallel(b *testing.B) {
func BenchmarkFingerprintLockerSerial(b *testing.B) { func BenchmarkFingerprintLockerSerial(b *testing.B) {
numFingerprints := 10 numFingerprints := 10
locker := newFingerprintLocker() locker := newFingerprintLocker(100)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View file

@ -50,9 +50,10 @@ const (
chunkHeaderFirstTimeOffset = 1 chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9 chunkHeaderLastTimeOffset = 9
indexingMaxBatchSize = 1024 // TODO: Consider making any of these configurable?
indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
indexingQueueCapacity = 10 * indexingMaxBatchSize // TODO: Export as metric. indexingQueueCapacity = 1024
) )
const ( const (

View file

@ -119,8 +119,9 @@ type memorySeries struct {
// (or all) chunkDescs are only on disk. These chunks are all contiguous // (or all) chunkDescs are only on disk. These chunks are all contiguous
// and at the tail end. // and at the tail end.
chunkDescsLoaded bool chunkDescsLoaded bool
// Whether the current head chunk has already been persisted. If true, // Whether the current head chunk has already been persisted (or at
// the current head chunk must not be modified anymore. // least has been scheduled to be persisted). If true, the current head
// chunk must not be modified anymore.
headChunkPersisted bool headChunkPersisted bool
} }
@ -171,6 +172,19 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per
} }
} }
func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.headChunkPersisted {
return
}
s.headChunkPersisted = true
persistQueue <- &persistRequest{
fingerprint: fp,
chunkDesc: s.head(),
}
}
func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) { func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()

View file

@ -201,26 +201,45 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
} }
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.RLock() fspsToArchive := []FingerprintSeriesPair{}
defer s.mtx.RUnlock()
defer func(begin time.Time) { defer func(begin time.Time) {
evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) evictionDuration.Set(float64(time.Since(begin) / time.Millisecond))
}(time.Now()) }(time.Now())
s.mtx.RLock()
for fp, series := range s.fingerprintToSeries { for fp, series := range s.fingerprintToSeries {
if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) {
fspsToArchive = append(fspsToArchive, FingerprintSeriesPair{
Fingerprint: fp,
Series: series,
})
}
series.persistHeadChunk(fp, s.persistQueue)
}
s.mtx.RUnlock()
if len(fspsToArchive) == 0 {
return
}
// If we are here, we have metrics to archive. For that, we need the write lock.
s.mtx.Lock()
defer s.mtx.Unlock()
for _, fsp := range fspsToArchive {
// TODO: Need series lock (or later FP lock)?
if !fsp.Series.headChunkPersisted {
// Oops. The series has received new samples all of a
// sudden, giving it a new head chunk. Leave it alone.
return
}
if err := s.persistence.ArchiveMetric( if err := s.persistence.ArchiveMetric(
fp, series.metric, series.firstTime(), series.lastTime(), fsp.Fingerprint, fsp.Series.metric, fsp.Series.firstTime(), fsp.Series.lastTime(),
); err != nil { ); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err) glog.Errorf("Error archiving metric %v: %v", fsp.Series.metric, err)
}
delete(s.fingerprintToSeries, fp)
s.persistQueue <- &persistRequest{
fingerprint: fp,
chunkDesc: series.head(),
}
} }
delete(s.fingerprintToSeries, fsp.Fingerprint)
} }
} }