storage: ensure timestamp monotonicity within series.

Fixes https://github.com/prometheus/prometheus/issues/481

While doing so, clean up and fix a few other things:

- Fix `go vet` warnings (@fabxc to blame ;).

- Fix a racey problem with unarchiving: Whenever we unarchive a
  series, we essentially want to do something with it. However, until
  we have done something with it, it appears like a series that is
  ready to be archived or even purged. So e.g. it would be ignored
  during checkpointing. With this fix, we always load the chunkDescs
  upon unarchiving. This is wasteful if we only want to add a new
  sample to an archived time series, but the (presumably more common)
  case where we access an archived time series in a query doesn't
  become more expensive.

- The change above streamlined the getOrCreateSeries ond
  newMemorySeries flow. Also, the modTime is now always set correctly.

- Fix the leveldb-backed implementation of KeyValueStore.Delete. It
  had the wrong behavior of still returning true, nil if a
  non-existing key has been passed in.
This commit is contained in:
beorn7 2015-07-13 21:12:27 +02:00
parent 1c25247a75
commit ff08f0b6fe
9 changed files with 185 additions and 114 deletions

View file

@ -250,17 +250,28 @@ func (p *persistence) sanitizeSeries(
// consistent with the checkpoint, so we have to take a closer // consistent with the checkpoint, so we have to take a closer
// look. // look.
if s.headChunkClosed { if s.headChunkClosed {
// This is the easy case as we don't have any chunks in // This is the easy case as we have all chunks on
// heads.db. Treat this series as a freshly unarchived // disk. Treat this series as a freshly unarchived one
// one. No chunks or chunkDescs in memory, no current // by loading the chunkDescs and setting all parameters
// head chunk. // based on the loaded chunkDescs.
cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
if err != nil {
log.Errorf(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
log.Warnf( log.Warnf(
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.", "Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
s.metric, fp, chunksInFile, s.metric, fp, len(cds),
) )
s.chunkDescs = nil s.chunkDescs = cds
s.chunkDescsOffset = chunksInFile s.chunkDescsOffset = 0
s.persistWatermark = 0 s.savedFirstTime = cds[0].firstTime()
s.lastTime = cds[len(cds)-1].lastTime()
s.persistWatermark = len(cds)
s.modTime = modTime s.modTime = modTime
return fp, true return fp, true
} }
@ -275,8 +286,8 @@ func (p *persistence) sanitizeSeries(
// First, throw away the chunkDescs without chunks. // First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:] s.chunkDescs = s.chunkDescs[s.persistWatermark:]
numMemChunkDescs.Sub(float64(s.persistWatermark)) numMemChunkDescs.Sub(float64(s.persistWatermark))
// Load all the chunk descs (which assumes we have none from the future). // Load all the chunk descs.
cds, err := p.loadChunkDescs(fp, clientmodel.Now()) cds, err := p.loadChunkDescs(fp, clientmodel.Latest)
if err != nil { if err != nil {
log.Errorf( log.Errorf(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s", "Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
@ -287,6 +298,7 @@ func (p *persistence) sanitizeSeries(
} }
s.persistWatermark = len(cds) s.persistWatermark = len(cds)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime()
s.modTime = modTime s.modTime = modTime
lastTime := cds[len(cds)-1].lastTime() lastTime := cds[len(cds)-1].lastTime()
@ -395,14 +407,11 @@ func (p *persistence) cleanUpArchiveIndexes(
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
return err return err
} }
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil { if err != nil {
return err return err
} }
series.chunkDescs = cds series := newMemorySeries(clientmodel.Metric(m), cds, p.seriesFileModTime(clientmodel.Fingerprint(fp)))
series.chunkDescsOffset = 0
series.persistWatermark = len(cds)
fpToSeries[clientmodel.Fingerprint(fp)] = series fpToSeries[clientmodel.Fingerprint(fp)] = series
return nil return nil
}); err != nil { }); err != nil {

View file

@ -94,9 +94,11 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
} }
baseValue := c.baseValue() baseValue := c.baseValue()
// TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is
// fixed, we should panic here if dt is negative.
dt := s.Timestamp - c.baseTime() dt := s.Timestamp - c.baseTime()
if dt < 0 {
panic("time delta is less than zero")
}
dv := s.Value - baseValue dv := s.Value - baseValue
tb := c.timeBytes() tb := c.timeBytes()
vb := c.valueBytes() vb := c.valueBytes()
@ -382,7 +384,7 @@ func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Times
// Take absolute value for d8. // Take absolute value for d8.
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
default: default:
panic("Invalid number of bytes for time delta") panic("invalid number of bytes for time delta")
} }
} }
@ -407,7 +409,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam
return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
// No d8 for ints. // No d8 for ints.
default: default:
panic("Invalid number of bytes for integer delta") panic("invalid number of bytes for integer delta")
} }
} else { } else {
switch it.vBytes { switch it.vBytes {
@ -417,7 +419,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam
// Take absolute value for d8. // Take absolute value for d8.
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
default: default:
panic("Invalid number of bytes for floating point delta") panic("invalid number of bytes for floating point delta")
} }
} }
} }

View file

@ -339,10 +339,7 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk { func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk {
baseTimeDelta := s.Timestamp - c.baseTime() baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 { if baseTimeDelta < 0 {
// TODO(beorn7): We ignore this irregular case for now. Once panic("base time delta is less than zero")
// https://github.com/prometheus/prometheus/issues/481 is
// fixed, we should panic here instead.
return []chunk{&c}
} }
c = c[:doubleDeltaHeaderBytes] c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
@ -511,7 +508,7 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel
// Take absolute value for d8. // Take absolute value for d8.
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
default: default:
panic("Invalid number of bytes for time delta") panic("invalid number of bytes for time delta")
} }
} }
@ -555,7 +552,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
// No d8 for ints. // No d8 for ints.
default: default:
panic("Invalid number of bytes for integer delta") panic("invalid number of bytes for integer delta")
} }
} else { } else {
switch it.vBytes { switch it.vBytes {
@ -567,7 +564,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod
// Take absolute value for d8. // Take absolute value for d8.
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
default: default:
panic("Invalid number of bytes for floating point delta") panic("invalid number of bytes for floating point delta")
} }
} }
} }

View file

@ -109,13 +109,18 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
err = l.storage.Delete(k, l.writeOpts) // Note that Delete returns nil if k does not exist. So we have to test
// for existence with Get first.
_, err = l.storage.Get(k, l.readOpts)
if err == leveldb.ErrNotFound { if err == leveldb.ErrNotFound {
return false, nil return false, nil
} }
if err != nil { if err != nil {
return false, err return false, err
} }
if err = l.storage.Delete(k, l.writeOpts); err != nil {
return false, err
}
return true, nil return true, nil
} }

View file

@ -643,7 +643,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return return
} }
} else { } else {
// This is the non-persisted head chunk. Fully marshal it. // This is a non-persisted chunk. Fully marshal it.
if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil {
return return
} }
@ -853,6 +853,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
modTime: modTime, modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset), chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime), savedFirstTime: clientmodel.Timestamp(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
headChunkClosed: persistWatermark >= numChunkDescs, headChunkClosed: persistWatermark >= numChunkDescs,
} }
} }
@ -1173,38 +1174,27 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error
// unarchiveMetric deletes an archived fingerprint and its metric, but (in // unarchiveMetric deletes an archived fingerprint and its metric, but (in
// contrast to purgeArchivedMetric) does not un-index the metric. If a metric // contrast to purgeArchivedMetric) does not un-index the metric. If a metric
// was actually deleted, the method returns true and the first time of the // was actually deleted, the method returns true and the first time and last
// deleted metric. The caller must have locked the fingerprint. // time of the deleted metric. The caller must have locked the fingerprint.
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) ( func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (deletedAnything bool, err error) {
deletedAnything bool,
firstDeletedTime clientmodel.Timestamp,
err error,
) {
defer func() { defer func() {
if err != nil { if err != nil {
p.setDirty(true) p.setDirty(true)
} }
}() }()
firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil || !has {
return false, firstTime, err
}
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
if err != nil { if err != nil || !deleted {
return false, firstTime, err return false, err
}
if !deleted {
log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp)
} }
deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)) deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
if err != nil { if err != nil {
return false, firstTime, err return false, err
} }
if !deleted { if !deleted {
log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp) log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
} }
return true, firstTime, nil return true, nil
} }
// close flushes the indexing queue and other buffered data and releases any // close flushes the indexing queue and other buffered data and releases any

View file

@ -17,6 +17,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
@ -354,11 +355,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
fpLocker := newFingerprintLocker(10) fpLocker := newFingerprintLocker(10)
sm := newSeriesMap() sm := newSeriesMap()
s1 := newMemorySeries(m1, true, 0) s1 := newMemorySeries(m1, nil, time.Time{})
s2 := newMemorySeries(m2, false, 0) s2 := newMemorySeries(m2, nil, time.Time{})
s3 := newMemorySeries(m3, false, 0) s3 := newMemorySeries(m3, nil, time.Time{})
s4 := newMemorySeries(m4, true, 0) s4 := newMemorySeries(m4, nil, time.Time{})
s5 := newMemorySeries(m5, true, 0) s5 := newMemorySeries(m5, nil, time.Time{})
s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14}) s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true s3.headChunkClosed = true
@ -416,8 +417,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS3.head().c != nil { if loadedS3.head().c != nil {
t.Error("head chunk not evicted") t.Error("head chunk not evicted")
} }
if loadedS3.chunkDescsOffset != -1 { if loadedS3.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset) t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset)
} }
if !loadedS3.headChunkClosed { if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false") t.Error("headChunkClosed is false")
@ -546,22 +547,19 @@ func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
} }
} }
unarchived, firstTime, err := p.unarchiveMetric(1) unarchived, err := p.unarchiveMetric(1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !unarchived { if !unarchived {
t.Fatal("expected actual unarchival") t.Error("expected actual unarchival")
} }
if firstTime != 2 { unarchived, err = p.unarchiveMetric(1)
t.Errorf("expected first time 2, got %v", firstTime)
}
unarchived, firstTime, err = p.unarchiveMetric(1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if unarchived { if unarchived {
t.Fatal("expected no unarchival") t.Error("expected no unarchival")
} }
expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{ expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{
@ -831,16 +829,13 @@ func testIndexing(t *testing.T, encoding chunkEncoding) {
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
for fp, m := range b.fpToMetric { for fp, m := range b.fpToMetric {
p.unindexMetric(fp, m) p.unindexMetric(fp, m)
unarchived, firstTime, err := p.unarchiveMetric(fp) unarchived, err := p.unarchiveMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !unarchived { if !unarchived {
t.Errorf("%d. metric not unarchived", i) t.Errorf("%d. metric not unarchived", i)
} }
if firstTime != 1 {
t.Errorf("%d. expected firstTime=1, got %v", i, firstTime)
}
delete(indexedFpsToMetrics, fp) delete(indexedFpsToMetrics, fp)
} }
} }

View file

@ -162,6 +162,9 @@ type memorySeries struct {
// first chunk before its chunk desc is evicted. In doubt, this field is // first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp. // just set to the oldest possible timestamp.
savedFirstTime clientmodel.Timestamp savedFirstTime clientmodel.Timestamp
// The timestamp of the last sample in this series. Needed for fast access to
// ensure timestamp monotonicity during ingestion.
lastTime clientmodel.Timestamp
// Whether the current head chunk has already been finished. If true, // Whether the current head chunk has already been finished. If true,
// the current head chunk must not be modified anymore. // the current head chunk must not be modified anymore.
headChunkClosed bool headChunkClosed bool
@ -175,28 +178,29 @@ type memorySeries struct {
} }
// newMemorySeries returns a pointer to a newly allocated memorySeries for the // newMemorySeries returns a pointer to a newly allocated memorySeries for the
// given metric. reallyNew defines if the memorySeries is a genuinely new series // given metric. chunkDescs and modTime in the new series are set according to
// or (if false) a series for a metric being unarchived, i.e. a series that // the provided parameters. chunkDescs can be nil or empty if this is a
// existed before but has been evicted from memory. If reallyNew is false, // genuinely new time series (i.e. not one that is being unarchived). In that
// firstTime is ignored (and set to the lowest possible timestamp instead - it // case, headChunkClosed is set to false, and firstTime and lastTime are both
// will be set properly upon the first eviction of chunkDescs). // set to clientmodel.Earliest. The zero value for modTime can be used if the
func newMemorySeries( // modification time of the series file is unknown (e.g. if this is a genuinely
m clientmodel.Metric, // new series).
reallyNew bool, func newMemorySeries(m clientmodel.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries {
firstTime clientmodel.Timestamp, firstTime := clientmodel.Earliest
) *memorySeries { lastTime := clientmodel.Earliest
if !reallyNew { if len(chunkDescs) > 0 {
firstTime = clientmodel.Earliest firstTime = chunkDescs[0].firstTime()
lastTime = chunkDescs[len(chunkDescs)-1].lastTime()
} }
s := memorySeries{ return &memorySeries{
metric: m, metric: m,
headChunkClosed: !reallyNew, chunkDescs: chunkDescs,
savedFirstTime: firstTime, headChunkClosed: len(chunkDescs) > 0,
savedFirstTime: firstTime,
lastTime: lastTime,
persistWatermark: len(chunkDescs),
modTime: modTime,
} }
if !reallyNew {
s.chunkDescsOffset = -1
}
return &s
} }
// add adds a sample pair to the series. It returns the number of newly // add adds a sample pair to the series. It returns the number of newly
@ -231,6 +235,8 @@ func (s *memorySeries) add(v *metric.SamplePair) int {
for _, c := range chunks[1:] { for _, c := range chunks[1:] {
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c)) s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
} }
s.lastTime = v.Timestamp
return len(chunks) - 1 return len(chunks) - 1
} }
@ -243,7 +249,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
if s.headChunkClosed { if s.headChunkClosed {
return false return false
} }
if time.Now().Sub(s.head().lastTime().Time()) > headChunkTimeout { if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout {
s.headChunkClosed = true s.headChunkClosed = true
// Since we cannot modify the head chunk from now on, we // Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore. // don't need to bother with cloning anymore.

View file

@ -135,6 +135,7 @@ type memorySeriesStorage struct {
numSeries prometheus.Gauge numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter ingestedSamplesCount prometheus.Counter
outOfOrderSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter
maintainSeriesDuration *prometheus.SummaryVec maintainSeriesDuration *prometheus.SummaryVec
} }
@ -203,6 +204,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
Name: "ingested_samples_total", Name: "ingested_samples_total",
Help: "The total number of samples ingested.", Help: "The total number of samples ingested.",
}), }),
outOfOrderSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "out_of_order_samples_total",
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
}),
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -550,6 +557,13 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
} }
series := s.getOrCreateSeries(fp, sample.Metric) series := s.getOrCreateSeries(fp, sample.Metric)
if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp)
log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime)
s.outOfOrderSamplesCount.Inc()
return
}
completedChunksCount := series.add(&metric.SamplePair{ completedChunksCount := series.add(&metric.SamplePair{
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
@ -562,18 +576,30 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
unarchived, firstTime, err := s.persistence.unarchiveMetric(fp) var cds []*chunkDesc
var modTime time.Time
unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil { if err != nil {
log.Errorf("Error unarchiving fingerprint %v: %v", fp, err) log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
} }
if unarchived { if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc() s.seriesOps.WithLabelValues(unarchive).Inc()
// We have to load chunkDescs anyway to do anything with
// the series, so let's do it right now so that we don't
// end up with a series without any chunkDescs for a
// while (which is confusing as it makes the series
// appear as archived or purged).
cds, err = s.loadChunkDescs(fp, clientmodel.Latest)
if err != nil {
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err)
}
modTime = s.persistence.seriesFileModTime(fp)
} else { } else {
// This was a genuinely new series, so index the metric. // This was a genuinely new series, so index the metric.
s.persistence.indexMetric(fp, m) s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc() s.seriesOps.WithLabelValues(create).Inc()
} }
series = newMemorySeries(m, !unarchived, firstTime) series = newMemorySeries(m, cds, modTime)
s.fpToSeries.put(fp, series) s.fpToSeries.put(fp, series)
s.numSeries.Inc() s.numSeries.Inc()
} }
@ -943,21 +969,8 @@ func (s *memorySeriesStorage) maintainMemorySeries(
if iOldestNotEvicted == -1 { if iOldestNotEvicted == -1 {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
// Make sure we have a head chunk descriptor (a freshly
// unarchived series has none).
if len(series.chunkDescs) == 0 {
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
if err != nil {
log.Errorf(
"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v",
series.metric, err,
)
return
}
series.chunkDescs = cds
}
if err := s.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(), fp, series.metric, series.firstTime(), series.lastTime,
); err != nil { ); err != nil {
log.Errorf("Error archiving metric %v: %v", series.metric, err) log.Errorf("Error archiving metric %v: %v", series.metric, err)
return return
@ -1154,6 +1167,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.numSeries.Desc() ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch) s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
ch <- s.outOfOrderSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc ch <- numMemChunksDesc
s.maintainSeriesDuration.Describe(ch) s.maintainSeriesDuration.Describe(ch)
@ -1178,6 +1192,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- s.numSeries ch <- s.numSeries
s.seriesOps.Collect(ch) s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount ch <- s.ingestedSamplesCount
ch <- s.outOfOrderSamplesCount
ch <- s.invalidPreloadRequestsCount ch <- s.invalidPreloadRequestsCount
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
numMemChunksDesc, numMemChunksDesc,

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"math/rand" "math/rand"
"reflect"
"testing" "testing"
"testing/quick" "testing/quick"
"time" "time"
@ -377,7 +378,7 @@ func TestRetentionCutoff(t *testing.T) {
s.WaitForIndexing() s.WaitForIndexing()
var fp clientmodel.Fingerprint var fp clientmodel.Fingerprint
for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) { for f := range s.fingerprintsForLabelPairs(metric.LabelPair{Name: "job", Value: "test"}) {
fp = f fp = f
break break
} }
@ -398,7 +399,7 @@ func TestRetentionCutoff(t *testing.T) {
t.Errorf("unexpected result for timestamp before retention period") t.Errorf("unexpected result for timestamp before retention period")
} }
vals = it.RangeValues(metric.Interval{insertStart, now}) vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
// We get 59 values here because the clientmodel.Now() is slightly later // We get 59 values here because the clientmodel.Now() is slightly later
// than our now. // than our now.
if len(vals) != 59 { if len(vals) != 59 {
@ -408,7 +409,7 @@ func TestRetentionCutoff(t *testing.T) {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
} }
vals = it.BoundaryValues(metric.Interval{insertStart, now}) vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
if len(vals) != 2 { if len(vals) != 2 {
t.Errorf("expected 2 values but got %d", len(vals)) t.Errorf("expected 2 values but got %d", len(vals))
} }
@ -441,7 +442,7 @@ func TestDropMetrics(t *testing.T) {
} }
s.WaitForIndexing() s.WaitForIndexing()
fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) fps := s.fingerprintsForLabelPairs(metric.LabelPair{Name: clientmodel.MetricNameLabel, Value: "test"})
if len(fps) != 2 { if len(fps) != 2 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps)) t.Fatalf("unexpected number of fingerprints: %d", len(fps))
} }
@ -449,7 +450,7 @@ func TestDropMetrics(t *testing.T) {
var fpList clientmodel.Fingerprints var fpList clientmodel.Fingerprints
for fp := range fps { for fp := range fps {
it := s.NewIterator(fp) it := s.NewIterator(fp)
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
fpList = append(fpList, fp) fpList = append(fpList, fp)
@ -458,34 +459,38 @@ func TestDropMetrics(t *testing.T) {
s.DropMetricsForFingerprints(fpList[0]) s.DropMetricsForFingerprints(fpList[0])
s.WaitForIndexing() s.WaitForIndexing()
fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{
Name: clientmodel.MetricNameLabel, Value: "test",
})
if len(fps2) != 1 { if len(fps2) != 1 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
} }
it := s.NewIterator(fpList[0]) it := s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
s.DropMetricsForFingerprints(fpList...) s.DropMetricsForFingerprints(fpList...)
s.WaitForIndexing() s.WaitForIndexing()
fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{
Name: clientmodel.MetricNameLabel, Value: "test",
})
if len(fps3) != 0 { if len(fps3) != 0 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
} }
it = s.NewIterator(fpList[0]) it = s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Fatalf("unexpected number of samples: %d", len(vals))
} }
} }
@ -1405,3 +1410,50 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam
} }
return result return result
} }
func TestAppendOutOfOrder(t *testing.T) {
s, closer := NewTestStorage(t, 1)
defer closer.Close()
m := clientmodel.Metric{
clientmodel.MetricNameLabel: "out_of_order",
}
for _, t := range []int{0, 2, 2, 1} {
s.Append(&clientmodel.Sample{
Metric: m,
Timestamp: clientmodel.Timestamp(t),
Value: clientmodel.SampleValue(t),
})
}
fp, err := s.mapper.mapFP(m.FastFingerprint(), m)
if err != nil {
t.Fatal(err)
}
pl := s.NewPreloader()
defer pl.Close()
err = pl.PreloadRange(fp, 0, 2, 5*time.Minute)
if err != nil {
t.Fatalf("error preloading chunks: %s", err)
}
it := s.NewIterator(fp)
want := metric.Values{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 2,
Value: 2,
},
}
got := it.RangeValues(metric.Interval{OldestInclusive: 0, NewestInclusive: 2})
if !reflect.DeepEqual(want, got) {
t.Fatalf("want %v, got %v", want, got)
}
}