mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Improve performance of ingestion.
- Parallelize AppendSamples as much as possible without breaking the contract about temporal order. - Allocate more fingerprint locker slots. - Do not run early checkpoints if we are behind on chunk persistence. - Increase fpMinWaitDuration to give the disk more time for more important things. Also, switch math.MaxInt64 and math.MinInt64 to the new constants.
This commit is contained in:
parent
3d0fb51648
commit
5d3cd65a5d
|
@ -16,7 +16,6 @@ package local
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
@ -342,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes(
|
|||
if err := kv.Value(&m); err != nil {
|
||||
return err
|
||||
}
|
||||
series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64)
|
||||
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
|
||||
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -27,9 +27,12 @@ import (
|
|||
// are goroutine-safe.
|
||||
type Storage interface {
|
||||
prometheus.Collector
|
||||
// AppendSamples stores a group of new samples. Multiple samples for the same
|
||||
// fingerprint need to be submitted in chronological order, from oldest to
|
||||
// newest (both in the same call to AppendSamples and across multiple calls).
|
||||
// AppendSamples stores a group of new samples. Multiple samples for the
|
||||
// same fingerprint need to be submitted in chronological order, from
|
||||
// oldest to newest (both in the same call to AppendSamples and across
|
||||
// multiple calls). When AppendSamples has returned, the appended
|
||||
// samples might not be queryable immediately. (Use WaitForIndexing to
|
||||
// wait for complete processing.)
|
||||
AppendSamples(clientmodel.Samples)
|
||||
// NewPreloader returns a new Preloader which allows preloading and pinning
|
||||
// series data into memory for use within a query.
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
package local
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -169,7 +168,7 @@ type memorySeries struct {
|
|||
// will be set properly upon the first eviction of chunkDescs).
|
||||
func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries {
|
||||
if reallyNew {
|
||||
firstTime = math.MinInt64
|
||||
firstTime = clientmodel.Earliest
|
||||
}
|
||||
s := memorySeries{
|
||||
metric: m,
|
||||
|
@ -337,7 +336,7 @@ func (s *memorySeries) preloadChunksForRange(
|
|||
from clientmodel.Timestamp, through clientmodel.Timestamp,
|
||||
fp clientmodel.Fingerprint, mss *memorySeriesStorage,
|
||||
) ([]*chunkDesc, error) {
|
||||
firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64)
|
||||
firstChunkDescTime := clientmodel.Latest
|
||||
if len(s.chunkDescs) > 0 {
|
||||
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ package local
|
|||
|
||||
import (
|
||||
"container/list"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -34,7 +34,7 @@ const (
|
|||
|
||||
// See waitForNextFP.
|
||||
fpMaxWaitDuration = 10 * time.Second
|
||||
fpMinWaitDuration = 5 * time.Millisecond // ~ hard disk seek time.
|
||||
fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time.
|
||||
fpMaxSweepTime = 6 * time.Hour
|
||||
|
||||
maxEvictInterval = time.Minute
|
||||
|
@ -69,6 +69,12 @@ type memorySeriesStorage struct {
|
|||
checkpointInterval time.Duration
|
||||
checkpointDirtySeriesLimit int
|
||||
|
||||
// The timestamp of the last sample appended.
|
||||
lastTimestampAppended clientmodel.Timestamp
|
||||
// Wait group for goroutines appending samples with the same timestamp.
|
||||
appendWaitGroup sync.WaitGroup
|
||||
appendMtx sync.Mutex
|
||||
|
||||
persistQueue chan persistRequest
|
||||
persistStopped chan struct{}
|
||||
persistence *persistence
|
||||
|
@ -125,7 +131,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
|||
numSeries.Set(float64(fpToSeries.length()))
|
||||
|
||||
return &memorySeriesStorage{
|
||||
fpLocker: newFingerprintLocker(256),
|
||||
fpLocker: newFingerprintLocker(1024),
|
||||
fpToSeries: fpToSeries,
|
||||
|
||||
loopStopping: make(chan struct{}),
|
||||
|
@ -135,6 +141,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
|||
checkpointInterval: o.CheckpointInterval,
|
||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||
|
||||
lastTimestampAppended: clientmodel.Earliest,
|
||||
|
||||
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
|
||||
persistStopped: make(chan struct{}),
|
||||
persistence: p,
|
||||
|
@ -234,6 +242,9 @@ func (s *memorySeriesStorage) Stop() error {
|
|||
|
||||
// WaitForIndexing implements Storage.
|
||||
func (s *memorySeriesStorage) WaitForIndexing() {
|
||||
// First let all goroutines appending samples stop.
|
||||
s.appendWaitGroup.Wait()
|
||||
// Only then wait for the persistence to index them.
|
||||
s.persistence.waitForIndexing()
|
||||
}
|
||||
|
||||
|
@ -360,11 +371,28 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
|
|||
|
||||
// AppendSamples implements Storage.
|
||||
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
|
||||
for _, sample := range samples {
|
||||
s.appendSample(sample)
|
||||
if len(samples) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
s.appendMtx.Lock()
|
||||
defer s.appendMtx.Unlock()
|
||||
|
||||
for _, sample := range samples {
|
||||
if sample.Timestamp != s.lastTimestampAppended {
|
||||
// Timestamp has changed. We have to wait for all
|
||||
// appendSample to complete before proceeding.
|
||||
s.appendWaitGroup.Wait()
|
||||
s.lastTimestampAppended = sample.Timestamp
|
||||
}
|
||||
|
||||
s.appendWaitGroup.Add(1)
|
||||
go func(sample *clientmodel.Sample) {
|
||||
s.appendSample(sample)
|
||||
s.appendWaitGroup.Done()
|
||||
}(sample)
|
||||
}
|
||||
|
||||
s.ingestedSamplesCount.Add(float64(len(samples)))
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
|
||||
|
@ -376,6 +404,8 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
|
|||
Timestamp: sample.Timestamp,
|
||||
})
|
||||
s.fpLocker.Unlock(fp)
|
||||
s.ingestedSamplesCount.Inc()
|
||||
|
||||
if len(chunkDescsToPersist) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -700,7 +730,16 @@ loop:
|
|||
s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
|
||||
case <-s.countPersistedHeadChunks:
|
||||
headChunksPersistedSinceLastCheckpoint++
|
||||
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit {
|
||||
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
||||
// As described above, we take the headChunksPersistedSinceLastCheckpoint as a
|
||||
// heuristic for "dirty" series. However, if we are already backlogging
|
||||
// chunks to be persisted, creating a checkpoint would be counterproductive,
|
||||
// as it would slow down chunk persisting even more, while in a situation like
|
||||
// that, the best we can do for crash recovery is to work through the persist
|
||||
// queue as quickly as possible. So only checkpoint if s.persistQueue is
|
||||
// at most 20% full.
|
||||
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit &&
|
||||
len(s.persistQueue) < cap(s.persistQueue)/5 {
|
||||
checkpointTimer.Reset(0)
|
||||
}
|
||||
}
|
||||
|
@ -751,7 +790,7 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
|||
// Make sure we have a head chunk descriptor (a freshly
|
||||
// unarchived series has none).
|
||||
if len(series.chunkDescs) == 0 {
|
||||
cds, err := s.loadChunkDescs(fp, math.MaxInt64)
|
||||
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err)
|
||||
return
|
||||
|
|
|
@ -70,6 +70,7 @@ func TestChunk(t *testing.T) {
|
|||
defer closer.Close()
|
||||
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
|
||||
s.(*memorySeriesStorage).fpLocker.Lock(m.fp)
|
||||
|
@ -109,6 +110,7 @@ func TestGetValueAtTime(t *testing.T) {
|
|||
defer closer.Close()
|
||||
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
fp := clientmodel.Metric{}.Fingerprint()
|
||||
|
||||
|
@ -191,6 +193,7 @@ func TestGetRangeValues(t *testing.T) {
|
|||
defer closer.Close()
|
||||
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
fp := clientmodel.Metric{}.Fingerprint()
|
||||
|
||||
|
@ -334,6 +337,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
|||
ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method.
|
||||
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
fp := clientmodel.Metric{}.Fingerprint()
|
||||
|
||||
|
@ -368,6 +372,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
|||
|
||||
// Recreate series.
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
series, ok := ms.fpToSeries.get(fp)
|
||||
if !ok {
|
||||
|
@ -450,6 +455,7 @@ func TestFuzz(t *testing.T) {
|
|||
|
||||
samples := createRandomSamples()
|
||||
s.AppendSamples(samples)
|
||||
s.WaitForIndexing()
|
||||
|
||||
return verifyStorage(t, s, samples, 24*7*time.Hour)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue