mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Add metrics to count inconsistencies and fp collisions.
This commit is contained in:
parent
c44e7cd105
commit
3b9ab546e6
|
@ -7,6 +7,7 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/log"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
@ -31,22 +32,31 @@ type fpMapper struct {
|
|||
|
||||
fpToSeries *seriesMap
|
||||
p *persistence
|
||||
|
||||
mappingsCounter prometheus.Counter
|
||||
}
|
||||
|
||||
// newFPMapper loads the collision map from the persistence and
|
||||
// returns an fpMapper ready to use.
|
||||
func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
|
||||
r := &fpMapper{
|
||||
m := &fpMapper{
|
||||
fpToSeries: fpToSeries,
|
||||
p: p,
|
||||
mappingsCounter: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "fingerprint_mappings_total",
|
||||
Help: "The total number of fingerprints being mapped to avoid collisions.",
|
||||
}),
|
||||
}
|
||||
mappings, nextFP, err := p.loadFPMappings()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.mappings = mappings
|
||||
r.highestMappedFP = nextFP
|
||||
return r, nil
|
||||
m.mappings = mappings
|
||||
m.mappingsCounter.Set(float64(len(m.mappings)))
|
||||
m.highestMappedFP = nextFP
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
|
||||
|
@ -55,33 +65,33 @@ func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
|
|||
//
|
||||
// If an error is encountered, it is returned together with the unchanged raw
|
||||
// fingerprint.
|
||||
func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clientmodel.Fingerprint, error) {
|
||||
func (m *fpMapper) mapFP(fp clientmodel.Fingerprint, metric clientmodel.Metric) (clientmodel.Fingerprint, error) {
|
||||
// First check if we are in the reserved FP space, in which case this is
|
||||
// automatically a collision that has to be mapped.
|
||||
if fp <= maxMappedFP {
|
||||
return r.maybeAddMapping(fp, m)
|
||||
return m.maybeAddMapping(fp, metric)
|
||||
}
|
||||
|
||||
// Then check the most likely case: This fp belongs to a series that is
|
||||
// already in memory.
|
||||
s, ok := r.fpToSeries.get(fp)
|
||||
s, ok := m.fpToSeries.get(fp)
|
||||
if ok {
|
||||
// FP exists in memory, but is it for the same metric?
|
||||
if m.Equal(s.metric) {
|
||||
if metric.Equal(s.metric) {
|
||||
// Yupp. We are done.
|
||||
return fp, nil
|
||||
}
|
||||
// Collision detected!
|
||||
return r.maybeAddMapping(fp, m)
|
||||
return m.maybeAddMapping(fp, metric)
|
||||
}
|
||||
// Metric is not in memory. Before doing the expensive archive lookup,
|
||||
// check if we have a mapping for this metric in place already.
|
||||
r.mtx.RLock()
|
||||
mappedFPs, fpAlreadyMapped := r.mappings[fp]
|
||||
r.mtx.RUnlock()
|
||||
m.mtx.RLock()
|
||||
mappedFPs, fpAlreadyMapped := m.mappings[fp]
|
||||
m.mtx.RUnlock()
|
||||
if fpAlreadyMapped {
|
||||
// We indeed have mapped fp historically.
|
||||
ms := metricToUniqueString(m)
|
||||
ms := metricToUniqueString(metric)
|
||||
// fp is locked by the caller, so no further locking of
|
||||
// 'collisions' required (it is specific to fp).
|
||||
mappedFP, ok := mappedFPs[ms]
|
||||
|
@ -93,18 +103,18 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie
|
|||
// If we are here, FP does not exist in memory and is either not mapped
|
||||
// at all, or existing mappings for FP are not for m. Check if we have
|
||||
// something for FP in the archive.
|
||||
archivedMetric, err := r.p.archivedMetric(fp)
|
||||
archivedMetric, err := m.p.archivedMetric(fp)
|
||||
if err != nil {
|
||||
return fp, err
|
||||
}
|
||||
if archivedMetric != nil {
|
||||
// FP exists in archive, but is it for the same metric?
|
||||
if m.Equal(archivedMetric) {
|
||||
if metric.Equal(archivedMetric) {
|
||||
// Yupp. We are done.
|
||||
return fp, nil
|
||||
}
|
||||
// Collision detected!
|
||||
return r.maybeAddMapping(fp, m)
|
||||
return m.maybeAddMapping(fp, metric)
|
||||
}
|
||||
// As fp does not exist, neither in memory nor in archive, we can safely
|
||||
// keep it unmapped.
|
||||
|
@ -114,14 +124,14 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie
|
|||
// maybeAddMapping is only used internally. It takes a detected collision and
|
||||
// adds it to the collisions map if not yet there. In any case, it returns the
|
||||
// truly unique fingerprint for the colliding metric.
|
||||
func (r *fpMapper) maybeAddMapping(
|
||||
func (m *fpMapper) maybeAddMapping(
|
||||
fp clientmodel.Fingerprint,
|
||||
collidingMetric clientmodel.Metric,
|
||||
) (clientmodel.Fingerprint, error) {
|
||||
ms := metricToUniqueString(collidingMetric)
|
||||
r.mtx.RLock()
|
||||
mappedFPs, ok := r.mappings[fp]
|
||||
r.mtx.RUnlock()
|
||||
m.mtx.RLock()
|
||||
mappedFPs, ok := m.mappings[fp]
|
||||
m.mtx.RUnlock()
|
||||
if ok {
|
||||
// fp is locked by the caller, so no further locking required.
|
||||
mappedFP, ok := mappedFPs[ms]
|
||||
|
@ -129,12 +139,12 @@ func (r *fpMapper) maybeAddMapping(
|
|||
return mappedFP, nil // Existing mapping.
|
||||
}
|
||||
// A new mapping has to be created.
|
||||
mappedFP = r.nextMappedFP()
|
||||
mappedFP = m.nextMappedFP()
|
||||
mappedFPs[ms] = mappedFP
|
||||
r.mtx.RLock()
|
||||
m.mtx.RLock()
|
||||
// Checkpoint mappings after each change.
|
||||
err := r.p.checkpointFPMappings(r.mappings)
|
||||
r.mtx.RUnlock()
|
||||
err := m.p.checkpointFPMappings(m.mappings)
|
||||
m.mtx.RUnlock()
|
||||
log.Infof(
|
||||
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
|
||||
fp, collidingMetric, mappedFP,
|
||||
|
@ -142,13 +152,14 @@ func (r *fpMapper) maybeAddMapping(
|
|||
return mappedFP, err
|
||||
}
|
||||
// This is the first collision for fp.
|
||||
mappedFP := r.nextMappedFP()
|
||||
mappedFP := m.nextMappedFP()
|
||||
mappedFPs = map[string]clientmodel.Fingerprint{ms: mappedFP}
|
||||
r.mtx.Lock()
|
||||
r.mappings[fp] = mappedFPs
|
||||
m.mtx.Lock()
|
||||
m.mappings[fp] = mappedFPs
|
||||
m.mappingsCounter.Inc()
|
||||
// Checkpoint mappings after each change.
|
||||
err := r.p.checkpointFPMappings(r.mappings)
|
||||
r.mtx.Unlock()
|
||||
err := m.p.checkpointFPMappings(m.mappings)
|
||||
m.mtx.Unlock()
|
||||
log.Infof(
|
||||
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
|
||||
fp, collidingMetric, mappedFP,
|
||||
|
@ -156,14 +167,24 @@ func (r *fpMapper) maybeAddMapping(
|
|||
return mappedFP, err
|
||||
}
|
||||
|
||||
func (r *fpMapper) nextMappedFP() clientmodel.Fingerprint {
|
||||
mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&r.highestMappedFP), 1))
|
||||
func (m *fpMapper) nextMappedFP() clientmodel.Fingerprint {
|
||||
mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
|
||||
if mappedFP > maxMappedFP {
|
||||
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
|
||||
}
|
||||
return mappedFP
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (m *fpMapper) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- m.mappingsCounter.Desc()
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (m *fpMapper) Collect(ch chan<- prometheus.Metric) {
|
||||
ch <- m.mappingsCounter
|
||||
}
|
||||
|
||||
// metricToUniqueString turns a metric into a string in a reproducible and
|
||||
// unique way, i.e. the same metric will always create the same string, and
|
||||
// different metrics will always create different strings. In a way, it is the
|
||||
|
|
|
@ -120,6 +120,7 @@ type persistence struct {
|
|||
indexingBatchSizes prometheus.Summary
|
||||
indexingBatchDuration prometheus.Summary
|
||||
checkpointDuration prometheus.Gauge
|
||||
dirtyCounter prometheus.Counter
|
||||
|
||||
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
|
||||
dirty bool // true if persistence was started in dirty state.
|
||||
|
@ -237,6 +238,12 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync
|
|||
Name: "checkpoint_duration_milliseconds",
|
||||
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
|
||||
}),
|
||||
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "inconsistencies_total",
|
||||
Help: "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
|
||||
}),
|
||||
dirty: dirty,
|
||||
pedanticChecks: pedanticChecks,
|
||||
dirtyFileName: dirtyPath,
|
||||
|
@ -282,6 +289,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
|
|||
p.indexingBatchSizes.Describe(ch)
|
||||
p.indexingBatchDuration.Describe(ch)
|
||||
ch <- p.checkpointDuration.Desc()
|
||||
ch <- p.dirtyCounter.Desc()
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
|
@ -293,6 +301,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
|
|||
p.indexingBatchSizes.Collect(ch)
|
||||
p.indexingBatchDuration.Collect(ch)
|
||||
ch <- p.checkpointDuration
|
||||
ch <- p.dirtyCounter
|
||||
}
|
||||
|
||||
// isDirty returns the dirty flag in a goroutine-safe way.
|
||||
|
@ -307,6 +316,7 @@ func (p *persistence) isDirty() bool {
|
|||
// dirty during our runtime, there is no way back. If we were dirty from the
|
||||
// start, a clean-up might make us clean again.)
|
||||
func (p *persistence) setDirty(dirty bool) {
|
||||
p.dirtyCounter.Inc()
|
||||
p.dirtyMtx.Lock()
|
||||
defer p.dirtyMtx.Unlock()
|
||||
if p.becameDirty {
|
||||
|
|
|
@ -1003,6 +1003,7 @@ func (s *memorySeriesStorage) persistenceBacklogScore() float64 {
|
|||
// Describe implements prometheus.Collector.
|
||||
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||
s.persistence.Describe(ch)
|
||||
s.mapper.Describe(ch)
|
||||
|
||||
ch <- s.persistErrors.Desc()
|
||||
ch <- maxChunksToPersistDesc
|
||||
|
@ -1018,6 +1019,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
|||
// Collect implements prometheus.Collector.
|
||||
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
|
||||
s.persistence.Collect(ch)
|
||||
s.mapper.Collect(ch)
|
||||
|
||||
ch <- s.persistErrors
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
|
|
Loading…
Reference in a new issue