Move to a queue model for appending samples after all.

Starting a goroutine takes 1-2µs on my laptop. From the "numbers every
Go programmer should know", I had 300ns for a channel send in my
mind. Turns out, on my laptop, it takes only 60ns. That's fast enough
to warrant the machinery of yet another channel with a fixed set of
worker goroutines feeding from it. The number chosen (8 for now) is
low enough to not really afflict a measurable overhead (a big
Prometheus server has >1000 goroutines running), but high enough to
not make sample ingestion a bottleneck.
This commit is contained in:
beorn7 2015-02-13 14:26:54 +01:00
parent fe518fdb28
commit e22f26bc58
2 changed files with 36 additions and 17 deletions

View file

@ -84,7 +84,7 @@ type indexingOp struct {
// A Persistence is used by a Storage implementation to store samples // A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if // persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods PersistChunk, // explicitly marked as such below. The chunk-related methods persistChunk,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint. // each other if each call refers to a different fingerprint.
type persistence struct { type persistence struct {

View file

@ -39,6 +39,9 @@ const (
maxEvictInterval = time.Minute maxEvictInterval = time.Minute
headChunkTimeout = time.Hour // Close head chunk if not touched for that long. headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
appendWorkers = 8 // Should be enough to not make appending a bottleneck.
appendQueueCap = 2 * appendWorkers
) )
type storageState uint type storageState uint
@ -69,10 +72,9 @@ type memorySeriesStorage struct {
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int checkpointDirtySeriesLimit int
// The timestamp of the last sample appended. appendQueue chan *clientmodel.Sample
lastTimestampAppended clientmodel.Timestamp appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
// Wait group for goroutines appending samples with the same timestamp. appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
appendWaitGroup sync.WaitGroup
persistQueue chan persistRequest persistQueue chan persistRequest
persistStopped chan struct{} persistStopped chan struct{}
@ -129,7 +131,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
}) })
numSeries.Set(float64(fpToSeries.length())) numSeries.Set(float64(fpToSeries.length()))
return &memorySeriesStorage{ s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(1024), fpLocker: newFingerprintLocker(1024),
fpToSeries: fpToSeries, fpToSeries: fpToSeries,
@ -140,7 +142,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
lastTimestampAppended: clientmodel.Earliest, appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}), persistStopped: make(chan struct{}),
@ -201,7 +204,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "invalid_preload_requests_total", Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}), }),
}, nil }
for i := 0; i < appendWorkers; i++ {
go func() {
for sample := range s.appendQueue {
s.appendSample(sample)
s.appendWaitGroup.Done()
}
}()
}
return s, nil
} }
// Start implements Storage. // Start implements Storage.
@ -215,6 +229,11 @@ func (s *memorySeriesStorage) Start() {
func (s *memorySeriesStorage) Stop() error { func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...") glog.Info("Stopping local storage...")
glog.Info("Draining append queue...")
close(s.appendQueue)
s.appendWaitGroup.Wait()
glog.Info("Append queue drained.")
glog.Info("Stopping maintenance loop...") glog.Info("Stopping maintenance loop...")
close(s.loopStopping) close(s.loopStopping)
<-s.loopStopped <-s.loopStopped
@ -371,18 +390,18 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
// AppendSamples implements Storage. // AppendSamples implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
for _, sample := range samples { for _, sample := range samples {
if sample.Timestamp != s.lastTimestampAppended { if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for all // Timestamp has changed. We have to wait for processing
// appendSample to complete before proceeding. // of all appended samples before proceeding. Otherwise,
// we might violate the storage contract that each
// sample appended to a given series has to have a
// timestamp greater or equal to the previous sample
// appended to that series.
s.appendWaitGroup.Wait() s.appendWaitGroup.Wait()
s.lastTimestampAppended = sample.Timestamp s.appendLastTimestamp = sample.Timestamp
} }
s.appendWaitGroup.Add(1) s.appendWaitGroup.Add(1)
go func(sample *clientmodel.Sample) { s.appendQueue <- sample
s.appendSample(sample)
s.appendWaitGroup.Done()
}(sample)
} }
} }