Improve instrumentation in storage.

Also, fix some other minor bugs.

Change-Id: If72f1c058b0f47d3e378fdf80228d7e9a8db06c7
This commit is contained in:
Bjoern Rabenstein 2014-10-22 19:21:23 +02:00
parent 351e66c5d2
commit 443dd33805
8 changed files with 303 additions and 246 deletions

View file

@ -15,12 +15,172 @@ package local
import ( import (
"io" "io"
"sync"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// Instrumentation.
// Note that the metrics are often set in bulk by the caller
// for performance reasons.
var (
chunkOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_ops_total",
Help: "The total number of chunk operations by their type.",
},
[]string{opTypeLabel},
)
chunkDescOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkdesc_ops_total",
Help: "The total number of chunk descriptor operations by their type.",
},
[]string{opTypeLabel},
)
)
const (
// Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1.
persistAndUnpin = "persist"
pin = "pin" // Excluding the pin on creation.
unpin = "unpin" // Excluding the unpin on persisting.
clone = "clone"
transcode = "transcode"
purge = "purge"
// Op-types for chunkOps and chunkDescOps.
evict = "evict"
load = "load"
)
func init() {
prometheus.MustRegister(chunkOps)
prometheus.MustRegister(chunkDescOps)
}
type chunkDesc struct {
sync.Mutex
chunk chunk
refCount int
evict bool
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
}
// newChunkDesc creates a new chunkDesc pointing to the given chunk (may be
// nil). The refCount of the new chunkDesc is 1.
func newChunkDesc(c chunk) *chunkDesc {
chunkOps.WithLabelValues(createAndPin).Inc()
return &chunkDesc{chunk: c, refCount: 1}
}
func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
cd.Lock()
defer cd.Unlock()
return cd.chunk.add(s)
}
func (cd *chunkDesc) pin() {
cd.Lock()
defer cd.Unlock()
cd.refCount++
}
func (cd *chunkDesc) unpin() {
cd.Lock()
defer cd.Unlock()
if cd.refCount == 0 {
panic("cannot unpin already unpinned chunk")
}
cd.refCount--
if cd.refCount == 0 && cd.evict {
cd.evictNow()
}
}
func (cd *chunkDesc) firstTime() clientmodel.Timestamp {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
return cd.chunkFirstTime
}
return cd.chunk.firstTime()
}
func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
return cd.chunkLastTime
}
return cd.chunk.lastTime()
}
func (cd *chunkDesc) isEvicted() bool {
cd.Lock()
defer cd.Unlock()
return cd.chunk == nil
}
func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
}
// setChunk points this chunkDesc to the given chunk. It panics if
// this chunkDesc already has a chunk set.
func (cd *chunkDesc) setChunk(c chunk) {
cd.Lock()
defer cd.Unlock()
if cd.chunk != nil {
panic("chunk already set")
}
cd.evict = false
cd.chunk = c
}
// evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this
// method is called, it evicts the chunk immediately and returns true. If the
// chunk is already evicted when this method is called, it returns true, too.
func (cd *chunkDesc) evictOnUnpin() bool {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
// Already evicted.
return true
}
cd.evict = true
if cd.refCount == 0 {
cd.evictNow()
return true
}
return false
}
// evictNow is an internal helper method.
func (cd *chunkDesc) evictNow() {
cd.chunkFirstTime = cd.chunk.firstTime()
cd.chunkLastTime = cd.chunk.lastTime()
cd.chunk = nil
chunkOps.WithLabelValues(evict).Inc()
}
// chunk is the interface for all chunks. Chunks are generally not // chunk is the interface for all chunks. Chunks are generally not
// goroutine-safe. // goroutine-safe.
type chunk interface { type chunk interface {
@ -62,7 +222,7 @@ type chunkIterator interface {
} }
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
numTranscodes.Inc() chunkOps.WithLabelValues(transcode).Inc()
head := dst head := dst
body := []chunk{} body := []chunk{}

View file

@ -1,83 +0,0 @@
// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
address = "instance"
alive = "alive"
failure = "failure"
outcome = "outcome"
state = "state"
success = "success"
unreachable = "unreachable"
)
var (
numSeries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_stored_series_count",
Help: "The number of currently stored series.",
})
numSamples = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_stored_samples_total",
Help: "The total number of stored samples.",
})
evictionDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_memory_eviction_duration_milliseconds",
Help: "The duration of the last memory eviction iteration in milliseconds.",
})
purgeDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_storage_purge_duration_milliseconds",
Help: "The duration of the last storage purge iteration in milliseconds.",
})
numTranscodes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_chunk_transcodes_total",
Help: "The total number of chunk transcodes.",
})
numPinnedChunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_pinned_chunks_count",
Help: "The current number of pinned chunks.",
})
persistLatencies = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "prometheus_persist_latency_milliseconds",
Help: "A summary of latencies for persisting each chunk.",
}, []string{outcome})
persistQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_persist_queue_length",
Help: "The current number of chunks waiting in the persist queue.",
})
persistQueueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_persist_queue_capacity",
Help: "The total capacity of the persist queue.",
})
)
func init() {
prometheus.MustRegister(numSeries)
prometheus.MustRegister(numSamples)
prometheus.MustRegister(evictionDuration)
prometheus.MustRegister(purgeDuration)
prometheus.MustRegister(numTranscodes)
prometheus.MustRegister(numPinnedChunks)
prometheus.MustRegister(persistLatencies)
prometheus.MustRegister(persistQueueLength)
prometheus.MustRegister(persistQueueCapacity)
persistQueueCapacity.Set(float64(persistQueueCap))
}

View file

@ -21,6 +21,15 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "local_storage"
errorLabel = "error"
opTypeLabel = "type"
)
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. // are goroutine-safe.
type Storage interface { type Storage interface {

View file

@ -34,9 +34,6 @@ import (
) )
const ( const (
namespace = "prometheus"
subsystem = "persistence"
seriesFileSuffix = ".db" seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp" seriesTempFileSuffix = ".db.tmp"
@ -258,12 +255,6 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]c
defer f.Close() defer f.Close()
chunks := make([]chunk, 0, len(indexes)) chunks := make([]chunk, 0, len(indexes))
defer func() {
if err == nil {
return
}
}()
typeBuf := make([]byte, 1) typeBuf := make([]byte, 1)
for _, idx := range indexes { for _, idx := range indexes {
_, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET) _, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET)
@ -343,6 +334,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
} }
cds = append(cds, cd) cds = append(cds, cd)
} }
chunkDescOps.WithLabelValues(load).Add(float64(len(cds)))
return cds, nil return cds, nil
} }
@ -504,10 +496,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
if err := chunk.unmarshal(r); err != nil { if err := chunk.unmarshal(r); err != nil {
return nil, err return nil, err
} }
chunkDescs[i] = &chunkDesc{ chunkDescs[i] = newChunkDesc(chunk)
chunk: chunk,
refCount: 1,
}
} }
} }
@ -546,6 +535,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
if err == io.EOF { if err == io.EOF {
// We ran into the end of the file without finding any chunks that should // We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file. // be kept. Remove the whole file.
chunkOps.WithLabelValues(purge).Add(float64(i))
if err := os.Remove(f.Name()); err != nil { if err := os.Remove(f.Name()); err != nil {
return true, err return true, err
} }
@ -556,6 +546,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
} }
lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf)) lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf))
if !lastTime.Before(beforeTime) { if !lastTime.Before(beforeTime) {
chunkOps.WithLabelValues(purge).Add(float64(i))
break break
} }
} }

View file

@ -106,4 +106,6 @@ func (p *memorySeriesPreloader) Close() {
for _, cd := range p.pinnedChunkDescs { for _, cd := range p.pinnedChunkDescs {
cd.unpin() cd.unpin()
} }
chunkOps.WithLabelValues(unpin).Add(float64(len(p.pinnedChunkDescs)))
} }

View file

@ -14,6 +14,7 @@
package local package local
import ( import (
"math"
"sort" "sort"
"sync" "sync"
@ -130,114 +131,6 @@ func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint {
return ch return ch
} }
type chunkDesc struct {
sync.Mutex
chunk chunk
refCount int
evict bool
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
}
func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
cd.Lock()
defer cd.Unlock()
return cd.chunk.add(s)
}
func (cd *chunkDesc) pin() {
cd.Lock()
defer cd.Unlock()
numPinnedChunks.Inc()
cd.refCount++
}
func (cd *chunkDesc) unpin() {
cd.Lock()
defer cd.Unlock()
if cd.refCount == 0 {
panic("cannot unpin already unpinned chunk")
}
numPinnedChunks.Dec()
cd.refCount--
if cd.refCount == 0 && cd.evict {
cd.evictNow()
}
}
func (cd *chunkDesc) firstTime() clientmodel.Timestamp {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
return cd.chunkFirstTime
}
return cd.chunk.firstTime()
}
func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
return cd.chunkLastTime
}
return cd.chunk.lastTime()
}
func (cd *chunkDesc) isEvicted() bool {
cd.Lock()
defer cd.Unlock()
return cd.chunk == nil
}
func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
}
func (cd *chunkDesc) open(c chunk) {
cd.Lock()
defer cd.Unlock()
if cd.refCount != 0 || cd.chunk != nil {
panic("cannot open already pinned chunk")
}
cd.evict = false
cd.chunk = c
numPinnedChunks.Inc()
cd.refCount++
}
// evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this
// method is called, it evicts the chunk immediately and returns true. If the
// chunk is already evicted when this method is called, it returns true, too.
func (cd *chunkDesc) evictOnUnpin() bool {
cd.Lock()
defer cd.Unlock()
if cd.chunk == nil {
// Already evicted.
return true
}
cd.evict = true
if cd.refCount == 0 {
cd.evictNow()
return true
}
return false
}
// evictNow is an internal helper method.
func (cd *chunkDesc) evictNow() {
cd.chunkFirstTime = cd.chunk.firstTime()
cd.chunkLastTime = cd.chunk.lastTime()
cd.chunk = nil
}
type memorySeries struct { type memorySeries struct {
metric clientmodel.Metric metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden. // Sorted by start time, overlapping chunk ranges are forbidden.
@ -268,10 +161,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries {
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) {
if len(s.chunkDescs) == 0 || s.headChunkPersisted { if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := &chunkDesc{ newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true))
chunk: newDeltaEncodedChunk(d1, d0, true),
refCount: 1,
}
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkPersisted = false s.headChunkPersisted = false
} }
@ -290,10 +180,7 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per
queuePersist(s.head()) queuePersist(s.head())
for i, c := range chunks[1:] { for i, c := range chunks[1:] {
cd := &chunkDesc{ cd := newChunkDesc(c)
chunk: c,
refCount: 1,
}
s.chunkDescs = append(s.chunkDescs, cd) s.chunkDescs = append(s.chunkDescs, cd)
// The last chunk is still growing. // The last chunk is still growing.
if i < len(chunks[1:])-1 { if i < len(chunks[1:])-1 {
@ -344,6 +231,7 @@ func (s *memorySeries) evictOlderThan(
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
if lenToKeep < len(s.chunkDescs) { if lenToKeep < len(s.chunkDescs) {
s.chunkDescsLoaded = false s.chunkDescsLoaded = false
chunkDescOps.WithLabelValues(evict).Add(float64(len(s.chunkDescs) - lenToKeep))
s.chunkDescs = append( s.chunkDescs = append(
make([]*chunkDesc, 0, lenToKeep), make([]*chunkDesc, 0, lenToKeep),
s.chunkDescs[len(s.chunkDescs)-lenToKeep:]..., s.chunkDescs[len(s.chunkDescs)-lenToKeep:]...,
@ -397,7 +285,7 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
for i := 0; i < keepIdx; i++ { for i := 0; i < keepIdx; i++ {
s.chunkDescs[i].evictOnUnpin() s.chunkDescs[i].evictOnUnpin()
} }
s.chunkDescs = s.chunkDescs[keepIdx:] s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...)
return len(s.chunkDescs) == 0 return len(s.chunkDescs) == 0
} }
@ -406,30 +294,30 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
pinnedChunkDescs = append(pinnedChunkDescs, s.chunkDescs[idx]) cd := s.chunkDescs[idx]
if s.chunkDescs[idx].isEvicted() { pinnedChunkDescs = append(pinnedChunkDescs, cd)
cd.pin() // Have to pin everything first to prevent concurrent evictOnUnpin later.
if cd.isEvicted() {
loadIndexes = append(loadIndexes, idx) loadIndexes = append(loadIndexes, idx)
} else {
s.chunkDescs[idx].pin()
} }
} }
chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs)))
if len(loadIndexes) > 0 { if len(loadIndexes) > 0 {
fp := s.metric.Fingerprint() fp := s.metric.Fingerprint()
chunks, err := p.loadChunks(fp, loadIndexes) chunks, err := p.loadChunks(fp, loadIndexes)
if err != nil { if err != nil {
// Unpin any pinned chunks that were already loaded. // Unpin the chunks since we won't return them as pinned chunks now.
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
if !cd.isEvicted() {
cd.unpin() cd.unpin()
} }
} chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs)))
return nil, err return nil, err
} }
for i, c := range chunks { for i, c := range chunks {
cd := s.chunkDescs[loadIndexes[i]] s.chunkDescs[loadIndexes[i]].setChunk(c)
cd.open(c)
} }
chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
} }
return pinnedChunkDescs, nil return pinnedChunkDescs, nil
@ -472,7 +360,7 @@ func (s *memorySeries) preloadChunksForRange(
from clientmodel.Timestamp, through clientmodel.Timestamp, from clientmodel.Timestamp, through clientmodel.Timestamp,
fp clientmodel.Fingerprint, p *persistence, fp clientmodel.Fingerprint, p *persistence,
) ([]*chunkDesc, error) { ) ([]*chunkDesc, error) {
firstChunkDescTime := through firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64)
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].firstTime()
} }
@ -516,6 +404,7 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if !cd.isEvicted() { if !cd.isEvicted() {
if i == len(s.chunkDescs)-1 { if i == len(s.chunkDescs)-1 {
chunkOps.WithLabelValues(clone).Inc()
chunks = append(chunks, cd.chunk.clone()) chunks = append(chunks, cd.chunk.clone())
} else { } else {
chunks = append(chunks, cd.chunk) chunks = append(chunks, cd.chunk)

View file

@ -19,14 +19,102 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
const persistQueueCap = 1024 const persistQueueCap = 1024
// Instrumentation.
var (
persistLatency = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_latency_microseconds",
Help: "A summary of latencies for persisting each chunk.",
})
persistErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_errors_total",
Help: "A counter of errors persisting chunks.",
},
[]string{errorLabel},
)
persistQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_queue_length",
Help: "The current number of chunks waiting in the persist queue.",
})
persistQueueCapacity = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_queue_capacity",
Help: "The total capacity of the persist queue.",
})
numSeries = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
})
seriesOps = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_ops_total",
Help: "The total number of series operations by their type.",
},
[]string{opTypeLabel},
)
ingestedSamplesCount = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
})
purgeDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "purge_duration_milliseconds",
Help: "The duration of the last storage purge iteration in milliseconds.",
})
evictionDuration = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "eviction_duration_milliseconds",
Help: "The duration of the last memory eviction iteration in milliseconds.",
})
)
const (
// Op-types for seriesOps.
create = "create"
archive = "archive"
unarchive = "unarchive"
memoryPurge = "purge_from_memory"
archivePurge = "purge_from_archive"
)
func init() {
prometheus.MustRegister(persistLatency)
prometheus.MustRegister(persistErrors)
prometheus.MustRegister(persistQueueLength)
prometheus.MustRegister(persistQueueCapacity)
prometheus.MustRegister(numSeries)
prometheus.MustRegister(seriesOps)
prometheus.MustRegister(ingestedSamplesCount)
prometheus.MustRegister(purgeDuration)
prometheus.MustRegister(evictionDuration)
persistQueueCapacity.Set(float64(persistQueueCap))
}
type storageState uint type storageState uint
const ( const (
@ -108,7 +196,7 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
s.appendSample(sample) s.appendSample(sample)
} }
numSamples.Add(float64(len(samples))) ingestedSamplesCount.Add(float64(len(samples)))
} }
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
@ -130,14 +218,16 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
if err != nil { if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
} }
if !unarchived { if unarchived {
seriesOps.WithLabelValues(unarchive).Inc()
} else {
// This was a genuinely new series, so index the metric. // This was a genuinely new series, so index the metric.
s.persistence.indexMetric(m, fp) s.persistence.indexMetric(m, fp)
seriesOps.WithLabelValues(create).Inc()
} }
series = newMemorySeries(m, !unarchived) series = newMemorySeries(m, !unarchived)
s.fingerprintToSeries.put(fp, series) s.fingerprintToSeries.put(fp, series)
numSeries.Set(float64(s.fingerprintToSeries.length())) numSeries.Inc()
} }
return series return series
} }
@ -204,7 +294,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
defer func(begin time.Time) { defer func(begin time.Time) {
evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) evictionDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond))
}(time.Now()) }(time.Now())
for m := range s.fingerprintToSeries.iter() { for m := range s.fingerprintToSeries.iter() {
@ -214,38 +304,33 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
m.fp, s.persistQueue, m.fp, s.persistQueue,
) { ) {
s.fingerprintToSeries.del(m.fp) s.fingerprintToSeries.del(m.fp)
numSeries.Dec()
if err := s.persistence.archiveMetric( if err := s.persistence.archiveMetric(
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
); err != nil { ); err != nil {
glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
} else {
seriesOps.WithLabelValues(archive).Inc()
} }
} }
s.fpLocker.Unlock(m.fp) s.fpLocker.Unlock(m.fp)
} }
} }
func recordPersist(start time.Time, err error) {
outcome := success
if err != nil {
outcome = failure
}
persistLatencies.WithLabelValues(outcome).Observe(float64(time.Since(start) / time.Millisecond))
}
func (s *memorySeriesStorage) handlePersistQueue() { func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue { for req := range s.persistQueue {
persistQueueLength.Set(float64(len(s.persistQueue))) persistQueueLength.Set(float64(len(s.persistQueue)))
//glog.Info("Persist request: ", *req.fingerprint)
start := time.Now() start := time.Now()
err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
recordPersist(start, err) persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil { if err != nil {
persistErrors.WithLabelValues(err.Error()).Inc()
glog.Error("Error persisting chunk, requeuing: ", err) glog.Error("Error persisting chunk, requeuing: ", err)
s.persistQueue <- req s.persistQueue <- req
continue continue
} }
req.chunkDesc.unpin() req.chunkDesc.unpin()
chunkOps.WithLabelValues(persistAndUnpin).Inc()
} }
s.persistDone <- true s.persistDone <- true
} }
@ -319,7 +404,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
s.purgeSeries(fp, ts) s.purgeSeries(fp, ts)
} }
} }
purgeDuration.Set(float64(time.Since(begin) / time.Millisecond)) purgeDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond))
glog.Info("Done purging old series data.") glog.Info("Done purging old series data.")
} }
} }
@ -342,6 +427,8 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
if series, ok := s.fingerprintToSeries.get(fp); ok { if series, ok := s.fingerprintToSeries.get(fp); ok {
if series.purgeOlderThan(beforeTime) && allDropped { if series.purgeOlderThan(beforeTime) && allDropped {
s.fingerprintToSeries.del(fp) s.fingerprintToSeries.del(fp)
numSeries.Dec()
seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(series.metric, fp) s.persistence.unindexMetric(series.metric, fp)
} }
return return
@ -349,13 +436,15 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
// If we arrive here, nothing was in memory, so the metric must have // If we arrive here, nothing was in memory, so the metric must have
// been archived. Drop the archived metric if there are no persisted // been archived. Drop the archived metric if there are no persisted
// chunks left. If we do drop the archived metric, we should update the // chunks left. If we don't drop the archived metric, we should update
// archivedFingerprintToTimeRange index according to the remaining // the archivedFingerprintToTimeRange index according to the remaining
// chunks, but it's probably not worth the effort. Queries going beyond // chunks, but it's probably not worth the effort. Queries going beyond
// the purge cut-off can be truncated in a more direct fashion. // the purge cut-off can be truncated in a more direct fashion.
if allDropped { if allDropped {
if err := s.persistence.dropArchivedMetric(fp); err != nil { if err := s.persistence.dropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
} else {
seriesOps.WithLabelValues(archivePurge).Inc()
} }
} }
} }

View file

@ -35,7 +35,7 @@ const (
// String constants for instrumentation. // String constants for instrumentation.
const ( const (
namespace = "prometheus" namespace = "prometheus"
subsystem = "remote_tsdb" subsystem = "remote_storage"
result = "result" result = "result"
success = "success" success = "success"