Merge pull request #524 from prometheus/beorn7/ingestion-tweaks

Improve performance of ingestion.
This commit is contained in:
Björn Rabenstein 2015-02-13 14:49:16 +01:00
commit 498e1b5154
8 changed files with 82 additions and 25 deletions

View file

@ -218,11 +218,9 @@ func (p *prometheus) Serve() {
}() }()
for samples := range p.unwrittenSamples { for samples := range p.unwrittenSamples {
if len(samples) > 0 { p.storage.AppendSamples(samples)
p.storage.AppendSamples(samples) if p.remoteTSDBQueue != nil {
if p.remoteTSDBQueue != nil { p.remoteTSDBQueue.Queue(samples)
p.remoteTSDBQueue.Queue(samples)
}
} }
} }

View file

@ -16,7 +16,6 @@ package local
import ( import (
"fmt" "fmt"
"io" "io"
"math"
"os" "os"
"path" "path"
"strings" "strings"
@ -342,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes(
if err := kv.Value(&m); err != nil { if err := kv.Value(&m); err != nil {
return err return err
} }
series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64) series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil { if err != nil {
return err return err

View file

@ -24,12 +24,15 @@ import (
) )
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. // except AppendSamples are goroutine-safe.
type Storage interface { type Storage interface {
prometheus.Collector prometheus.Collector
// AppendSamples stores a group of new samples. Multiple samples for the same // AppendSamples stores a group of new samples. Multiple samples for the
// fingerprint need to be submitted in chronological order, from oldest to // same fingerprint need to be submitted in chronological order, from
// newest (both in the same call to AppendSamples and across multiple calls). // oldest to newest (both in the same call to AppendSamples and across
// multiple calls). When AppendSamples has returned, the appended
// samples might not be queryable immediately. (Use WaitForIndexing to
// wait for complete processing.) This method is not goroutine-safe.
AppendSamples(clientmodel.Samples) AppendSamples(clientmodel.Samples)
// NewPreloader returns a new Preloader which allows preloading and pinning // NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query. // series data into memory for use within a query.

View file

@ -84,7 +84,7 @@ type indexingOp struct {
// A Persistence is used by a Storage implementation to store samples // A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if // persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods PersistChunk, // explicitly marked as such below. The chunk-related methods persistChunk,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint. // each other if each call refers to a different fingerprint.
type persistence struct { type persistence struct {

View file

@ -14,7 +14,6 @@
package local package local
import ( import (
"math"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -169,7 +168,7 @@ type memorySeries struct {
// will be set properly upon the first eviction of chunkDescs). // will be set properly upon the first eviction of chunkDescs).
func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries { func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries {
if reallyNew { if reallyNew {
firstTime = math.MinInt64 firstTime = clientmodel.Earliest
} }
s := memorySeries{ s := memorySeries{
metric: m, metric: m,
@ -337,7 +336,7 @@ func (s *memorySeries) preloadChunksForRange(
from clientmodel.Timestamp, through clientmodel.Timestamp, from clientmodel.Timestamp, through clientmodel.Timestamp,
fp clientmodel.Fingerprint, mss *memorySeriesStorage, fp clientmodel.Fingerprint, mss *memorySeriesStorage,
) ([]*chunkDesc, error) { ) ([]*chunkDesc, error) {
firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64) firstChunkDescTime := clientmodel.Latest
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].firstTime()
} }

View file

@ -16,7 +16,7 @@ package local
import ( import (
"container/list" "container/list"
"math" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -34,11 +34,14 @@ const (
// See waitForNextFP. // See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second fpMaxWaitDuration = 10 * time.Second
fpMinWaitDuration = 5 * time.Millisecond // ~ hard disk seek time. fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time.
fpMaxSweepTime = 6 * time.Hour fpMaxSweepTime = 6 * time.Hour
maxEvictInterval = time.Minute maxEvictInterval = time.Minute
headChunkTimeout = time.Hour // Close head chunk if not touched for that long. headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
appendWorkers = 8 // Should be enough to not make appending a bottleneck.
appendQueueCap = 2 * appendWorkers
) )
type storageState uint type storageState uint
@ -69,6 +72,10 @@ type memorySeriesStorage struct {
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int checkpointDirtySeriesLimit int
appendQueue chan *clientmodel.Sample
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest persistQueue chan persistRequest
persistStopped chan struct{} persistStopped chan struct{}
persistence *persistence persistence *persistence
@ -124,8 +131,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
}) })
numSeries.Set(float64(fpToSeries.length())) numSeries.Set(float64(fpToSeries.length()))
return &memorySeriesStorage{ s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(256), fpLocker: newFingerprintLocker(1024),
fpToSeries: fpToSeries, fpToSeries: fpToSeries,
loopStopping: make(chan struct{}), loopStopping: make(chan struct{}),
@ -135,6 +142,9 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}), persistStopped: make(chan struct{}),
persistence: p, persistence: p,
@ -194,7 +204,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "invalid_preload_requests_total", Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}), }),
}, nil }
for i := 0; i < appendWorkers; i++ {
go func() {
for sample := range s.appendQueue {
s.appendSample(sample)
s.appendWaitGroup.Done()
}
}()
}
return s, nil
} }
// Start implements Storage. // Start implements Storage.
@ -208,6 +229,11 @@ func (s *memorySeriesStorage) Start() {
func (s *memorySeriesStorage) Stop() error { func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...") glog.Info("Stopping local storage...")
glog.Info("Draining append queue...")
close(s.appendQueue)
s.appendWaitGroup.Wait()
glog.Info("Append queue drained.")
glog.Info("Stopping maintenance loop...") glog.Info("Stopping maintenance loop...")
close(s.loopStopping) close(s.loopStopping)
<-s.loopStopped <-s.loopStopped
@ -234,6 +260,9 @@ func (s *memorySeriesStorage) Stop() error {
// WaitForIndexing implements Storage. // WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() { func (s *memorySeriesStorage) WaitForIndexing() {
// First let all goroutines appending samples stop.
s.appendWaitGroup.Wait()
// Only then wait for the persistence to index them.
s.persistence.waitForIndexing() s.persistence.waitForIndexing()
} }
@ -361,10 +390,19 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
// AppendSamples implements Storage. // AppendSamples implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
for _, sample := range samples { for _, sample := range samples {
s.appendSample(sample) if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for processing
// of all appended samples before proceeding. Otherwise,
// we might violate the storage contract that each
// sample appended to a given series has to have a
// timestamp greater or equal to the previous sample
// appended to that series.
s.appendWaitGroup.Wait()
s.appendLastTimestamp = sample.Timestamp
}
s.appendWaitGroup.Add(1)
s.appendQueue <- sample
} }
s.ingestedSamplesCount.Add(float64(len(samples)))
} }
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
@ -376,6 +414,8 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
}) })
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc()
if len(chunkDescsToPersist) == 0 { if len(chunkDescsToPersist) == 0 {
return return
} }
@ -700,7 +740,16 @@ loop:
s.seriesOps.WithLabelValues(archiveMaintenance).Inc() s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
case <-s.countPersistedHeadChunks: case <-s.countPersistedHeadChunks:
headChunksPersistedSinceLastCheckpoint++ headChunksPersistedSinceLastCheckpoint++
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit { // Check if we have enough "dirty" series so that we need an early checkpoint.
// As described above, we take the headChunksPersistedSinceLastCheckpoint as a
// heuristic for "dirty" series. However, if we are already backlogging
// chunks to be persisted, creating a checkpoint would be counterproductive,
// as it would slow down chunk persisting even more, while in a situation like
// that, the best we can do for crash recovery is to work through the persist
// queue as quickly as possible. So only checkpoint if s.persistQueue is
// at most 20% full.
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit &&
len(s.persistQueue) < cap(s.persistQueue)/5 {
checkpointTimer.Reset(0) checkpointTimer.Reset(0)
} }
} }
@ -751,7 +800,7 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
// Make sure we have a head chunk descriptor (a freshly // Make sure we have a head chunk descriptor (a freshly
// unarchived series has none). // unarchived series has none).
if len(series.chunkDescs) == 0 { if len(series.chunkDescs) == 0 {
cds, err := s.loadChunkDescs(fp, math.MaxInt64) cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
if err != nil { if err != nil {
glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err) glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err)
return return

View file

@ -70,6 +70,7 @@ func TestChunk(t *testing.T) {
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
for m := range s.(*memorySeriesStorage).fpToSeries.iter() { for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
s.(*memorySeriesStorage).fpLocker.Lock(m.fp) s.(*memorySeriesStorage).fpLocker.Lock(m.fp)
@ -109,6 +110,7 @@ func TestGetValueAtTime(t *testing.T) {
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -191,6 +193,7 @@ func TestGetRangeValues(t *testing.T) {
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -334,6 +337,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method. ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method.
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -368,6 +372,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
// Recreate series. // Recreate series.
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
series, ok := ms.fpToSeries.get(fp) series, ok := ms.fpToSeries.get(fp)
if !ok { if !ok {
@ -450,6 +455,7 @@ func TestFuzz(t *testing.T) {
samples := createRandomSamples() samples := createRandomSamples()
s.AppendSamples(samples) s.AppendSamples(samples)
s.WaitForIndexing()
return verifyStorage(t, s, samples, 24*7*time.Hour) return verifyStorage(t, s, samples, 24*7*time.Hour)
} }

View file

@ -115,6 +115,9 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
// Queue queues a sample batch to be sent to the TSDB. It drops the most // Queue queues a sample batch to be sent to the TSDB. It drops the most
// recently queued samples on the floor if the queue is full. // recently queued samples on the floor if the queue is full.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
if len(s) == 0 {
return
}
select { select {
case t.queue <- s: case t.queue <- s:
default: default: