mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Merge pull request #889 from prometheus/monotonicity
Ensure timestamp monotonicity within a series.
This commit is contained in:
commit
82e2fd8566
|
@ -250,17 +250,28 @@ func (p *persistence) sanitizeSeries(
|
||||||
// consistent with the checkpoint, so we have to take a closer
|
// consistent with the checkpoint, so we have to take a closer
|
||||||
// look.
|
// look.
|
||||||
if s.headChunkClosed {
|
if s.headChunkClosed {
|
||||||
// This is the easy case as we don't have any chunks in
|
// This is the easy case as we have all chunks on
|
||||||
// heads.db. Treat this series as a freshly unarchived
|
// disk. Treat this series as a freshly unarchived one
|
||||||
// one. No chunks or chunkDescs in memory, no current
|
// by loading the chunkDescs and setting all parameters
|
||||||
// head chunk.
|
// based on the loaded chunkDescs.
|
||||||
|
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf(
|
||||||
|
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
|
||||||
|
s.metric, fp, err,
|
||||||
|
)
|
||||||
|
purge()
|
||||||
|
return fp, false
|
||||||
|
}
|
||||||
log.Warnf(
|
log.Warnf(
|
||||||
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
|
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
|
||||||
s.metric, fp, chunksInFile,
|
s.metric, fp, len(cds),
|
||||||
)
|
)
|
||||||
s.chunkDescs = nil
|
s.chunkDescs = cds
|
||||||
s.chunkDescsOffset = chunksInFile
|
s.chunkDescsOffset = 0
|
||||||
s.persistWatermark = 0
|
s.savedFirstTime = cds[0].firstTime()
|
||||||
|
s.lastTime = cds[len(cds)-1].lastTime()
|
||||||
|
s.persistWatermark = len(cds)
|
||||||
s.modTime = modTime
|
s.modTime = modTime
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
|
@ -275,8 +286,8 @@ func (p *persistence) sanitizeSeries(
|
||||||
// First, throw away the chunkDescs without chunks.
|
// First, throw away the chunkDescs without chunks.
|
||||||
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
|
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
|
||||||
numMemChunkDescs.Sub(float64(s.persistWatermark))
|
numMemChunkDescs.Sub(float64(s.persistWatermark))
|
||||||
// Load all the chunk descs (which assumes we have none from the future).
|
// Load all the chunk descs.
|
||||||
cds, err := p.loadChunkDescs(fp, clientmodel.Now())
|
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf(
|
log.Errorf(
|
||||||
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
|
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
|
||||||
|
@ -287,6 +298,7 @@ func (p *persistence) sanitizeSeries(
|
||||||
}
|
}
|
||||||
s.persistWatermark = len(cds)
|
s.persistWatermark = len(cds)
|
||||||
s.chunkDescsOffset = 0
|
s.chunkDescsOffset = 0
|
||||||
|
s.savedFirstTime = cds[0].firstTime()
|
||||||
s.modTime = modTime
|
s.modTime = modTime
|
||||||
|
|
||||||
lastTime := cds[len(cds)-1].lastTime()
|
lastTime := cds[len(cds)-1].lastTime()
|
||||||
|
@ -395,14 +407,11 @@ func (p *persistence) cleanUpArchiveIndexes(
|
||||||
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
|
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
|
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest)
|
||||||
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
series.chunkDescs = cds
|
series := newMemorySeries(clientmodel.Metric(m), cds, p.seriesFileModTime(clientmodel.Fingerprint(fp)))
|
||||||
series.chunkDescsOffset = 0
|
|
||||||
series.persistWatermark = len(cds)
|
|
||||||
fpToSeries[clientmodel.Fingerprint(fp)] = series
|
fpToSeries[clientmodel.Fingerprint(fp)] = series
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
|
|
@ -94,9 +94,11 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
|
||||||
}
|
}
|
||||||
|
|
||||||
baseValue := c.baseValue()
|
baseValue := c.baseValue()
|
||||||
// TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is
|
|
||||||
// fixed, we should panic here if dt is negative.
|
|
||||||
dt := s.Timestamp - c.baseTime()
|
dt := s.Timestamp - c.baseTime()
|
||||||
|
if dt < 0 {
|
||||||
|
panic("time delta is less than zero")
|
||||||
|
}
|
||||||
|
|
||||||
dv := s.Value - baseValue
|
dv := s.Value - baseValue
|
||||||
tb := c.timeBytes()
|
tb := c.timeBytes()
|
||||||
vb := c.valueBytes()
|
vb := c.valueBytes()
|
||||||
|
@ -382,7 +384,7 @@ func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Times
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for time delta")
|
panic("invalid number of bytes for time delta")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,7 +409,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam
|
||||||
return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for integer delta")
|
panic("invalid number of bytes for integer delta")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
|
@ -417,7 +419,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for floating point delta")
|
panic("invalid number of bytes for floating point delta")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -339,10 +339,7 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk {
|
||||||
func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk {
|
func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk {
|
||||||
baseTimeDelta := s.Timestamp - c.baseTime()
|
baseTimeDelta := s.Timestamp - c.baseTime()
|
||||||
if baseTimeDelta < 0 {
|
if baseTimeDelta < 0 {
|
||||||
// TODO(beorn7): We ignore this irregular case for now. Once
|
panic("base time delta is less than zero")
|
||||||
// https://github.com/prometheus/prometheus/issues/481 is
|
|
||||||
// fixed, we should panic here instead.
|
|
||||||
return []chunk{&c}
|
|
||||||
}
|
}
|
||||||
c = c[:doubleDeltaHeaderBytes]
|
c = c[:doubleDeltaHeaderBytes]
|
||||||
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
|
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
|
||||||
|
@ -511,7 +508,7 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for time delta")
|
panic("invalid number of bytes for time delta")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,7 +552,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod
|
||||||
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for integer delta")
|
panic("invalid number of bytes for integer delta")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
|
@ -567,7 +564,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
default:
|
default:
|
||||||
panic("Invalid number of bytes for floating point delta")
|
panic("invalid number of bytes for floating point delta")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,11 +109,12 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
err = l.storage.Delete(k, l.writeOpts)
|
// Note that Delete returns nil if k does not exist. So we have to test
|
||||||
if err == leveldb.ErrNotFound {
|
// for existence with Has first.
|
||||||
return false, nil
|
if has, err := l.storage.Has(k, l.readOpts); !has || err != nil {
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err = l.storage.Delete(k, l.writeOpts); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
@ -643,7 +643,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// This is the non-persisted head chunk. Fully marshal it.
|
// This is a non-persisted chunk. Fully marshal it.
|
||||||
if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil {
|
if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -853,6 +853,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
||||||
modTime: modTime,
|
modTime: modTime,
|
||||||
chunkDescsOffset: int(chunkDescsOffset),
|
chunkDescsOffset: int(chunkDescsOffset),
|
||||||
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
|
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
|
||||||
|
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
|
||||||
headChunkClosed: persistWatermark >= numChunkDescs,
|
headChunkClosed: persistWatermark >= numChunkDescs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1173,38 +1174,27 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error
|
||||||
|
|
||||||
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
|
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
|
||||||
// contrast to purgeArchivedMetric) does not un-index the metric. If a metric
|
// contrast to purgeArchivedMetric) does not un-index the metric. If a metric
|
||||||
// was actually deleted, the method returns true and the first time of the
|
// was actually deleted, the method returns true and the first time and last
|
||||||
// deleted metric. The caller must have locked the fingerprint.
|
// time of the deleted metric. The caller must have locked the fingerprint.
|
||||||
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (
|
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (deletedAnything bool, err error) {
|
||||||
deletedAnything bool,
|
|
||||||
firstDeletedTime clientmodel.Timestamp,
|
|
||||||
err error,
|
|
||||||
) {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.setDirty(true)
|
p.setDirty(true)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp)
|
|
||||||
if err != nil || !has {
|
|
||||||
return false, firstTime, err
|
|
||||||
}
|
|
||||||
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
|
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
|
||||||
if err != nil {
|
if err != nil || !deleted {
|
||||||
return false, firstTime, err
|
return false, err
|
||||||
}
|
|
||||||
if !deleted {
|
|
||||||
log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp)
|
|
||||||
}
|
}
|
||||||
deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
|
deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, firstTime, err
|
return false, err
|
||||||
}
|
}
|
||||||
if !deleted {
|
if !deleted {
|
||||||
log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
|
log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
|
||||||
}
|
}
|
||||||
return true, firstTime, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// close flushes the indexing queue and other buffered data and releases any
|
// close flushes the indexing queue and other buffered data and releases any
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
|
@ -354,11 +355,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
|
||||||
|
|
||||||
fpLocker := newFingerprintLocker(10)
|
fpLocker := newFingerprintLocker(10)
|
||||||
sm := newSeriesMap()
|
sm := newSeriesMap()
|
||||||
s1 := newMemorySeries(m1, true, 0)
|
s1 := newMemorySeries(m1, nil, time.Time{})
|
||||||
s2 := newMemorySeries(m2, false, 0)
|
s2 := newMemorySeries(m2, nil, time.Time{})
|
||||||
s3 := newMemorySeries(m3, false, 0)
|
s3 := newMemorySeries(m3, nil, time.Time{})
|
||||||
s4 := newMemorySeries(m4, true, 0)
|
s4 := newMemorySeries(m4, nil, time.Time{})
|
||||||
s5 := newMemorySeries(m5, true, 0)
|
s5 := newMemorySeries(m5, nil, time.Time{})
|
||||||
s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14})
|
s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14})
|
||||||
s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7})
|
s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7})
|
||||||
s3.headChunkClosed = true
|
s3.headChunkClosed = true
|
||||||
|
@ -416,8 +417,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
|
||||||
if loadedS3.head().c != nil {
|
if loadedS3.head().c != nil {
|
||||||
t.Error("head chunk not evicted")
|
t.Error("head chunk not evicted")
|
||||||
}
|
}
|
||||||
if loadedS3.chunkDescsOffset != -1 {
|
if loadedS3.chunkDescsOffset != 0 {
|
||||||
t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset)
|
t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset)
|
||||||
}
|
}
|
||||||
if !loadedS3.headChunkClosed {
|
if !loadedS3.headChunkClosed {
|
||||||
t.Error("headChunkClosed is false")
|
t.Error("headChunkClosed is false")
|
||||||
|
@ -546,22 +547,19 @@ func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unarchived, firstTime, err := p.unarchiveMetric(1)
|
unarchived, err := p.unarchiveMetric(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !unarchived {
|
if !unarchived {
|
||||||
t.Fatal("expected actual unarchival")
|
t.Error("expected actual unarchival")
|
||||||
}
|
}
|
||||||
if firstTime != 2 {
|
unarchived, err = p.unarchiveMetric(1)
|
||||||
t.Errorf("expected first time 2, got %v", firstTime)
|
|
||||||
}
|
|
||||||
unarchived, firstTime, err = p.unarchiveMetric(1)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if unarchived {
|
if unarchived {
|
||||||
t.Fatal("expected no unarchival")
|
t.Error("expected no unarchival")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{
|
expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{
|
||||||
|
@ -831,16 +829,13 @@ func testIndexing(t *testing.T, encoding chunkEncoding) {
|
||||||
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
|
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
|
||||||
for fp, m := range b.fpToMetric {
|
for fp, m := range b.fpToMetric {
|
||||||
p.unindexMetric(fp, m)
|
p.unindexMetric(fp, m)
|
||||||
unarchived, firstTime, err := p.unarchiveMetric(fp)
|
unarchived, err := p.unarchiveMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !unarchived {
|
if !unarchived {
|
||||||
t.Errorf("%d. metric not unarchived", i)
|
t.Errorf("%d. metric not unarchived", i)
|
||||||
}
|
}
|
||||||
if firstTime != 1 {
|
|
||||||
t.Errorf("%d. expected firstTime=1, got %v", i, firstTime)
|
|
||||||
}
|
|
||||||
delete(indexedFpsToMetrics, fp)
|
delete(indexedFpsToMetrics, fp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,6 +162,9 @@ type memorySeries struct {
|
||||||
// first chunk before its chunk desc is evicted. In doubt, this field is
|
// first chunk before its chunk desc is evicted. In doubt, this field is
|
||||||
// just set to the oldest possible timestamp.
|
// just set to the oldest possible timestamp.
|
||||||
savedFirstTime clientmodel.Timestamp
|
savedFirstTime clientmodel.Timestamp
|
||||||
|
// The timestamp of the last sample in this series. Needed for fast access to
|
||||||
|
// ensure timestamp monotonicity during ingestion.
|
||||||
|
lastTime clientmodel.Timestamp
|
||||||
// Whether the current head chunk has already been finished. If true,
|
// Whether the current head chunk has already been finished. If true,
|
||||||
// the current head chunk must not be modified anymore.
|
// the current head chunk must not be modified anymore.
|
||||||
headChunkClosed bool
|
headChunkClosed bool
|
||||||
|
@ -175,28 +178,29 @@ type memorySeries struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
|
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
|
||||||
// given metric. reallyNew defines if the memorySeries is a genuinely new series
|
// given metric. chunkDescs and modTime in the new series are set according to
|
||||||
// or (if false) a series for a metric being unarchived, i.e. a series that
|
// the provided parameters. chunkDescs can be nil or empty if this is a
|
||||||
// existed before but has been evicted from memory. If reallyNew is false,
|
// genuinely new time series (i.e. not one that is being unarchived). In that
|
||||||
// firstTime is ignored (and set to the lowest possible timestamp instead - it
|
// case, headChunkClosed is set to false, and firstTime and lastTime are both
|
||||||
// will be set properly upon the first eviction of chunkDescs).
|
// set to clientmodel.Earliest. The zero value for modTime can be used if the
|
||||||
func newMemorySeries(
|
// modification time of the series file is unknown (e.g. if this is a genuinely
|
||||||
m clientmodel.Metric,
|
// new series).
|
||||||
reallyNew bool,
|
func newMemorySeries(m clientmodel.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries {
|
||||||
firstTime clientmodel.Timestamp,
|
firstTime := clientmodel.Earliest
|
||||||
) *memorySeries {
|
lastTime := clientmodel.Earliest
|
||||||
if !reallyNew {
|
if len(chunkDescs) > 0 {
|
||||||
firstTime = clientmodel.Earliest
|
firstTime = chunkDescs[0].firstTime()
|
||||||
|
lastTime = chunkDescs[len(chunkDescs)-1].lastTime()
|
||||||
}
|
}
|
||||||
s := memorySeries{
|
return &memorySeries{
|
||||||
metric: m,
|
metric: m,
|
||||||
headChunkClosed: !reallyNew,
|
chunkDescs: chunkDescs,
|
||||||
savedFirstTime: firstTime,
|
headChunkClosed: len(chunkDescs) > 0,
|
||||||
|
savedFirstTime: firstTime,
|
||||||
|
lastTime: lastTime,
|
||||||
|
persistWatermark: len(chunkDescs),
|
||||||
|
modTime: modTime,
|
||||||
}
|
}
|
||||||
if !reallyNew {
|
|
||||||
s.chunkDescsOffset = -1
|
|
||||||
}
|
|
||||||
return &s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// add adds a sample pair to the series. It returns the number of newly
|
// add adds a sample pair to the series. It returns the number of newly
|
||||||
|
@ -231,6 +235,8 @@ func (s *memorySeries) add(v *metric.SamplePair) int {
|
||||||
for _, c := range chunks[1:] {
|
for _, c := range chunks[1:] {
|
||||||
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
|
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.lastTime = v.Timestamp
|
||||||
return len(chunks) - 1
|
return len(chunks) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +249,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
|
||||||
if s.headChunkClosed {
|
if s.headChunkClosed {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if time.Now().Sub(s.head().lastTime().Time()) > headChunkTimeout {
|
if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout {
|
||||||
s.headChunkClosed = true
|
s.headChunkClosed = true
|
||||||
// Since we cannot modify the head chunk from now on, we
|
// Since we cannot modify the head chunk from now on, we
|
||||||
// don't need to bother with cloning anymore.
|
// don't need to bother with cloning anymore.
|
||||||
|
|
|
@ -135,6 +135,7 @@ type memorySeriesStorage struct {
|
||||||
numSeries prometheus.Gauge
|
numSeries prometheus.Gauge
|
||||||
seriesOps *prometheus.CounterVec
|
seriesOps *prometheus.CounterVec
|
||||||
ingestedSamplesCount prometheus.Counter
|
ingestedSamplesCount prometheus.Counter
|
||||||
|
outOfOrderSamplesCount prometheus.Counter
|
||||||
invalidPreloadRequestsCount prometheus.Counter
|
invalidPreloadRequestsCount prometheus.Counter
|
||||||
maintainSeriesDuration *prometheus.SummaryVec
|
maintainSeriesDuration *prometheus.SummaryVec
|
||||||
}
|
}
|
||||||
|
@ -203,6 +204,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
|
||||||
Name: "ingested_samples_total",
|
Name: "ingested_samples_total",
|
||||||
Help: "The total number of samples ingested.",
|
Help: "The total number of samples ingested.",
|
||||||
}),
|
}),
|
||||||
|
outOfOrderSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "out_of_order_samples_total",
|
||||||
|
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
|
||||||
|
}),
|
||||||
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
|
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
|
@ -550,6 +557,13 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
}
|
}
|
||||||
series := s.getOrCreateSeries(fp, sample.Metric)
|
series := s.getOrCreateSeries(fp, sample.Metric)
|
||||||
|
|
||||||
|
if sample.Timestamp <= series.lastTime {
|
||||||
|
s.fpLocker.Unlock(fp)
|
||||||
|
log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime)
|
||||||
|
s.outOfOrderSamplesCount.Inc()
|
||||||
|
return
|
||||||
|
}
|
||||||
completedChunksCount := series.add(&metric.SamplePair{
|
completedChunksCount := series.add(&metric.SamplePair{
|
||||||
Value: sample.Value,
|
Value: sample.Value,
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
|
@ -562,18 +576,30 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
if !ok {
|
if !ok {
|
||||||
unarchived, firstTime, err := s.persistence.unarchiveMetric(fp)
|
var cds []*chunkDesc
|
||||||
|
var modTime time.Time
|
||||||
|
unarchived, err := s.persistence.unarchiveMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
|
log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
|
||||||
}
|
}
|
||||||
if unarchived {
|
if unarchived {
|
||||||
s.seriesOps.WithLabelValues(unarchive).Inc()
|
s.seriesOps.WithLabelValues(unarchive).Inc()
|
||||||
|
// We have to load chunkDescs anyway to do anything with
|
||||||
|
// the series, so let's do it right now so that we don't
|
||||||
|
// end up with a series without any chunkDescs for a
|
||||||
|
// while (which is confusing as it makes the series
|
||||||
|
// appear as archived or purged).
|
||||||
|
cds, err = s.loadChunkDescs(fp, clientmodel.Latest)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err)
|
||||||
|
}
|
||||||
|
modTime = s.persistence.seriesFileModTime(fp)
|
||||||
} else {
|
} else {
|
||||||
// This was a genuinely new series, so index the metric.
|
// This was a genuinely new series, so index the metric.
|
||||||
s.persistence.indexMetric(fp, m)
|
s.persistence.indexMetric(fp, m)
|
||||||
s.seriesOps.WithLabelValues(create).Inc()
|
s.seriesOps.WithLabelValues(create).Inc()
|
||||||
}
|
}
|
||||||
series = newMemorySeries(m, !unarchived, firstTime)
|
series = newMemorySeries(m, cds, modTime)
|
||||||
s.fpToSeries.put(fp, series)
|
s.fpToSeries.put(fp, series)
|
||||||
s.numSeries.Inc()
|
s.numSeries.Inc()
|
||||||
}
|
}
|
||||||
|
@ -943,21 +969,8 @@ func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
if iOldestNotEvicted == -1 {
|
if iOldestNotEvicted == -1 {
|
||||||
s.fpToSeries.del(fp)
|
s.fpToSeries.del(fp)
|
||||||
s.numSeries.Dec()
|
s.numSeries.Dec()
|
||||||
// Make sure we have a head chunk descriptor (a freshly
|
|
||||||
// unarchived series has none).
|
|
||||||
if len(series.chunkDescs) == 0 {
|
|
||||||
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf(
|
|
||||||
"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v",
|
|
||||||
series.metric, err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
series.chunkDescs = cds
|
|
||||||
}
|
|
||||||
if err := s.persistence.archiveMetric(
|
if err := s.persistence.archiveMetric(
|
||||||
fp, series.metric, series.firstTime(), series.head().lastTime(),
|
fp, series.metric, series.firstTime(), series.lastTime,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
log.Errorf("Error archiving metric %v: %v", series.metric, err)
|
log.Errorf("Error archiving metric %v: %v", series.metric, err)
|
||||||
return
|
return
|
||||||
|
@ -1154,6 +1167,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||||
ch <- s.numSeries.Desc()
|
ch <- s.numSeries.Desc()
|
||||||
s.seriesOps.Describe(ch)
|
s.seriesOps.Describe(ch)
|
||||||
ch <- s.ingestedSamplesCount.Desc()
|
ch <- s.ingestedSamplesCount.Desc()
|
||||||
|
ch <- s.outOfOrderSamplesCount.Desc()
|
||||||
ch <- s.invalidPreloadRequestsCount.Desc()
|
ch <- s.invalidPreloadRequestsCount.Desc()
|
||||||
ch <- numMemChunksDesc
|
ch <- numMemChunksDesc
|
||||||
s.maintainSeriesDuration.Describe(ch)
|
s.maintainSeriesDuration.Describe(ch)
|
||||||
|
@ -1178,6 +1192,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
|
||||||
ch <- s.numSeries
|
ch <- s.numSeries
|
||||||
s.seriesOps.Collect(ch)
|
s.seriesOps.Collect(ch)
|
||||||
ch <- s.ingestedSamplesCount
|
ch <- s.ingestedSamplesCount
|
||||||
|
ch <- s.outOfOrderSamplesCount
|
||||||
ch <- s.invalidPreloadRequestsCount
|
ch <- s.invalidPreloadRequestsCount
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
numMemChunksDesc,
|
numMemChunksDesc,
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"testing/quick"
|
"testing/quick"
|
||||||
"time"
|
"time"
|
||||||
|
@ -377,7 +378,7 @@ func TestRetentionCutoff(t *testing.T) {
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
var fp clientmodel.Fingerprint
|
var fp clientmodel.Fingerprint
|
||||||
for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) {
|
for f := range s.fingerprintsForLabelPairs(metric.LabelPair{Name: "job", Value: "test"}) {
|
||||||
fp = f
|
fp = f
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -398,7 +399,7 @@ func TestRetentionCutoff(t *testing.T) {
|
||||||
t.Errorf("unexpected result for timestamp before retention period")
|
t.Errorf("unexpected result for timestamp before retention period")
|
||||||
}
|
}
|
||||||
|
|
||||||
vals = it.RangeValues(metric.Interval{insertStart, now})
|
vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
|
||||||
// We get 59 values here because the clientmodel.Now() is slightly later
|
// We get 59 values here because the clientmodel.Now() is slightly later
|
||||||
// than our now.
|
// than our now.
|
||||||
if len(vals) != 59 {
|
if len(vals) != 59 {
|
||||||
|
@ -408,7 +409,7 @@ func TestRetentionCutoff(t *testing.T) {
|
||||||
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
|
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
|
||||||
}
|
}
|
||||||
|
|
||||||
vals = it.BoundaryValues(metric.Interval{insertStart, now})
|
vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
|
||||||
if len(vals) != 2 {
|
if len(vals) != 2 {
|
||||||
t.Errorf("expected 2 values but got %d", len(vals))
|
t.Errorf("expected 2 values but got %d", len(vals))
|
||||||
}
|
}
|
||||||
|
@ -441,7 +442,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
|
fps := s.fingerprintsForLabelPairs(metric.LabelPair{Name: clientmodel.MetricNameLabel, Value: "test"})
|
||||||
if len(fps) != 2 {
|
if len(fps) != 2 {
|
||||||
t.Fatalf("unexpected number of fingerprints: %d", len(fps))
|
t.Fatalf("unexpected number of fingerprints: %d", len(fps))
|
||||||
}
|
}
|
||||||
|
@ -449,7 +450,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
var fpList clientmodel.Fingerprints
|
var fpList clientmodel.Fingerprints
|
||||||
for fp := range fps {
|
for fp := range fps {
|
||||||
it := s.NewIterator(fp)
|
it := s.NewIterator(fp)
|
||||||
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
|
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
|
||||||
t.Fatalf("unexpected number of samples: %d", len(vals))
|
t.Fatalf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
fpList = append(fpList, fp)
|
fpList = append(fpList, fp)
|
||||||
|
@ -458,34 +459,38 @@ func TestDropMetrics(t *testing.T) {
|
||||||
s.DropMetricsForFingerprints(fpList[0])
|
s.DropMetricsForFingerprints(fpList[0])
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
|
fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{
|
||||||
|
Name: clientmodel.MetricNameLabel, Value: "test",
|
||||||
|
})
|
||||||
if len(fps2) != 1 {
|
if len(fps2) != 1 {
|
||||||
t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
|
t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
|
||||||
}
|
}
|
||||||
|
|
||||||
it := s.NewIterator(fpList[0])
|
it := s.NewIterator(fpList[0])
|
||||||
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
|
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||||
t.Fatalf("unexpected number of samples: %d", len(vals))
|
t.Fatalf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
it = s.NewIterator(fpList[1])
|
it = s.NewIterator(fpList[1])
|
||||||
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
|
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
|
||||||
t.Fatalf("unexpected number of samples: %d", len(vals))
|
t.Fatalf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
|
|
||||||
s.DropMetricsForFingerprints(fpList...)
|
s.DropMetricsForFingerprints(fpList...)
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"})
|
fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{
|
||||||
|
Name: clientmodel.MetricNameLabel, Value: "test",
|
||||||
|
})
|
||||||
if len(fps3) != 0 {
|
if len(fps3) != 0 {
|
||||||
t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
|
t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
|
||||||
}
|
}
|
||||||
|
|
||||||
it = s.NewIterator(fpList[0])
|
it = s.NewIterator(fpList[0])
|
||||||
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
|
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||||
t.Fatalf("unexpected number of samples: %d", len(vals))
|
t.Fatalf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
it = s.NewIterator(fpList[1])
|
it = s.NewIterator(fpList[1])
|
||||||
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
|
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
|
||||||
t.Fatalf("unexpected number of samples: %d", len(vals))
|
t.Fatalf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1405,3 +1410,50 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAppendOutOfOrder(t *testing.T) {
|
||||||
|
s, closer := NewTestStorage(t, 1)
|
||||||
|
defer closer.Close()
|
||||||
|
|
||||||
|
m := clientmodel.Metric{
|
||||||
|
clientmodel.MetricNameLabel: "out_of_order",
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, t := range []int{0, 2, 2, 1} {
|
||||||
|
s.Append(&clientmodel.Sample{
|
||||||
|
Metric: m,
|
||||||
|
Timestamp: clientmodel.Timestamp(t),
|
||||||
|
Value: clientmodel.SampleValue(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fp, err := s.mapper.mapFP(m.FastFingerprint(), m)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := s.NewPreloader()
|
||||||
|
defer pl.Close()
|
||||||
|
|
||||||
|
err = pl.PreloadRange(fp, 0, 2, 5*time.Minute)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error preloading chunks: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
it := s.NewIterator(fp)
|
||||||
|
|
||||||
|
want := metric.Values{
|
||||||
|
{
|
||||||
|
Timestamp: 0,
|
||||||
|
Value: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Timestamp: 2,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
got := it.RangeValues(metric.Interval{OldestInclusive: 0, NewestInclusive: 2})
|
||||||
|
if !reflect.DeepEqual(want, got) {
|
||||||
|
t.Fatalf("want %v, got %v", want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue