From ff08f0b6fe1c2c9010537bd3b9acf051744f91bb Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 13 Jul 2015 21:12:27 +0200 Subject: [PATCH 1/3] storage: ensure timestamp monotonicity within series. Fixes https://github.com/prometheus/prometheus/issues/481 While doing so, clean up and fix a few other things: - Fix `go vet` warnings (@fabxc to blame ;). - Fix a racey problem with unarchiving: Whenever we unarchive a series, we essentially want to do something with it. However, until we have done something with it, it appears like a series that is ready to be archived or even purged. So e.g. it would be ignored during checkpointing. With this fix, we always load the chunkDescs upon unarchiving. This is wasteful if we only want to add a new sample to an archived time series, but the (presumably more common) case where we access an archived time series in a query doesn't become more expensive. - The change above streamlined the getOrCreateSeries ond newMemorySeries flow. Also, the modTime is now always set correctly. - Fix the leveldb-backed implementation of KeyValueStore.Delete. It had the wrong behavior of still returning true, nil if a non-existing key has been passed in. --- storage/local/crashrecovery.go | 39 +++++++++------- storage/local/delta.go | 12 ++--- storage/local/doubledelta.go | 11 ++--- storage/local/index/leveldb.go | 7 ++- storage/local/persistence.go | 28 ++++-------- storage/local/persistence_test.go | 31 ++++++------- storage/local/series.go | 48 +++++++++++--------- storage/local/storage.go | 49 +++++++++++++------- storage/local/storage_test.go | 74 ++++++++++++++++++++++++++----- 9 files changed, 185 insertions(+), 114 deletions(-) diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d52bdfcb0..cbb0061bf 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -250,17 +250,28 @@ func (p *persistence) sanitizeSeries( // consistent with the checkpoint, so we have to take a closer // look. if s.headChunkClosed { - // This is the easy case as we don't have any chunks in - // heads.db. Treat this series as a freshly unarchived - // one. No chunks or chunkDescs in memory, no current - // head chunk. + // This is the easy case as we have all chunks on + // disk. Treat this series as a freshly unarchived one + // by loading the chunkDescs and setting all parameters + // based on the loaded chunkDescs. + cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + if err != nil { + log.Errorf( + "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } log.Warnf( "Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.", - s.metric, fp, chunksInFile, + s.metric, fp, len(cds), ) - s.chunkDescs = nil - s.chunkDescsOffset = chunksInFile - s.persistWatermark = 0 + s.chunkDescs = cds + s.chunkDescsOffset = 0 + s.savedFirstTime = cds[0].firstTime() + s.lastTime = cds[len(cds)-1].lastTime() + s.persistWatermark = len(cds) s.modTime = modTime return fp, true } @@ -275,8 +286,8 @@ func (p *persistence) sanitizeSeries( // First, throw away the chunkDescs without chunks. s.chunkDescs = s.chunkDescs[s.persistWatermark:] numMemChunkDescs.Sub(float64(s.persistWatermark)) - // Load all the chunk descs (which assumes we have none from the future). - cds, err := p.loadChunkDescs(fp, clientmodel.Now()) + // Load all the chunk descs. + cds, err := p.loadChunkDescs(fp, clientmodel.Latest) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -287,6 +298,7 @@ func (p *persistence) sanitizeSeries( } s.persistWatermark = len(cds) s.chunkDescsOffset = 0 + s.savedFirstTime = cds[0].firstTime() s.modTime = modTime lastTime := cds[len(cds)-1].lastTime() @@ -395,14 +407,11 @@ func (p *persistence) cleanUpArchiveIndexes( if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) - cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) + cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest) if err != nil { return err } - series.chunkDescs = cds - series.chunkDescsOffset = 0 - series.persistWatermark = len(cds) + series := newMemorySeries(clientmodel.Metric(m), cds, p.seriesFileModTime(clientmodel.Fingerprint(fp))) fpToSeries[clientmodel.Fingerprint(fp)] = series return nil }); err != nil { diff --git a/storage/local/delta.go b/storage/local/delta.go index ebb92e3cd..ebff38f15 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -94,9 +94,11 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { } baseValue := c.baseValue() - // TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is - // fixed, we should panic here if dt is negative. dt := s.Timestamp - c.baseTime() + if dt < 0 { + panic("time delta is less than zero") + } + dv := s.Value - baseValue tb := c.timeBytes() vb := c.valueBytes() @@ -382,7 +384,7 @@ func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Times // Take absolute value for d8. return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) default: - panic("Invalid number of bytes for time delta") + panic("invalid number of bytes for time delta") } } @@ -407,7 +409,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - panic("Invalid number of bytes for integer delta") + panic("invalid number of bytes for integer delta") } } else { switch it.vBytes { @@ -417,7 +419,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam // Take absolute value for d8. return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - panic("Invalid number of bytes for floating point delta") + panic("invalid number of bytes for floating point delta") } } } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index f11471aa5..7377ab2bd 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -339,10 +339,7 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - // TODO(beorn7): We ignore this irregular case for now. Once - // https://github.com/prometheus/prometheus/issues/481 is - // fixed, we should panic here instead. - return []chunk{&c} + panic("base time delta is less than zero") } c = c[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { @@ -511,7 +508,7 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel // Take absolute value for d8. return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) default: - panic("Invalid number of bytes for time delta") + panic("invalid number of bytes for time delta") } } @@ -555,7 +552,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - panic("Invalid number of bytes for integer delta") + panic("invalid number of bytes for integer delta") } } else { switch it.vBytes { @@ -567,7 +564,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod // Take absolute value for d8. return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - panic("Invalid number of bytes for floating point delta") + panic("invalid number of bytes for floating point delta") } } } diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index 4a1345ab3..37630b8b3 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -109,13 +109,18 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) { if err != nil { return false, err } - err = l.storage.Delete(k, l.writeOpts) + // Note that Delete returns nil if k does not exist. So we have to test + // for existence with Get first. + _, err = l.storage.Get(k, l.readOpts) if err == leveldb.ErrNotFound { return false, nil } if err != nil { return false, err } + if err = l.storage.Delete(k, l.writeOpts); err != nil { + return false, err + } return true, nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2e7cdb056..a629e23f4 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -643,7 +643,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } else { - // This is the non-persisted head chunk. Fully marshal it. + // This is a non-persisted chunk. Fully marshal it. if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { return } @@ -853,6 +853,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), + lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), headChunkClosed: persistWatermark >= numChunkDescs, } } @@ -1173,38 +1174,27 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error // unarchiveMetric deletes an archived fingerprint and its metric, but (in // contrast to purgeArchivedMetric) does not un-index the metric. If a metric -// was actually deleted, the method returns true and the first time of the -// deleted metric. The caller must have locked the fingerprint. -func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) ( - deletedAnything bool, - firstDeletedTime clientmodel.Timestamp, - err error, -) { +// was actually deleted, the method returns true and the first time and last +// time of the deleted metric. The caller must have locked the fingerprint. +func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (deletedAnything bool, err error) { defer func() { if err != nil { p.setDirty(true) } }() - firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp) - if err != nil || !has { - return false, firstTime, err - } deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) - if err != nil { - return false, firstTime, err - } - if !deleted { - log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp) + if err != nil || !deleted { + return false, err } deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)) if err != nil { - return false, firstTime, err + return false, err } if !deleted { log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp) } - return true, firstTime, nil + return true, nil } // close flushes the indexing queue and other buffered data and releases any diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 1954823a0..34bb672b3 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -17,6 +17,7 @@ import ( "reflect" "sync" "testing" + "time" clientmodel "github.com/prometheus/client_golang/model" @@ -354,11 +355,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true, 0) - s2 := newMemorySeries(m2, false, 0) - s3 := newMemorySeries(m3, false, 0) - s4 := newMemorySeries(m4, true, 0) - s5 := newMemorySeries(m5, true, 0) + s1 := newMemorySeries(m1, nil, time.Time{}) + s2 := newMemorySeries(m2, nil, time.Time{}) + s3 := newMemorySeries(m3, nil, time.Time{}) + s4 := newMemorySeries(m4, nil, time.Time{}) + s5 := newMemorySeries(m5, nil, time.Time{}) s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true @@ -416,8 +417,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS3.head().c != nil { t.Error("head chunk not evicted") } - if loadedS3.chunkDescsOffset != -1 { - t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset) + if loadedS3.chunkDescsOffset != 0 { + t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset) } if !loadedS3.headChunkClosed { t.Error("headChunkClosed is false") @@ -546,22 +547,19 @@ func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { } } - unarchived, firstTime, err := p.unarchiveMetric(1) + unarchived, err := p.unarchiveMetric(1) if err != nil { t.Fatal(err) } if !unarchived { - t.Fatal("expected actual unarchival") + t.Error("expected actual unarchival") } - if firstTime != 2 { - t.Errorf("expected first time 2, got %v", firstTime) - } - unarchived, firstTime, err = p.unarchiveMetric(1) + unarchived, err = p.unarchiveMetric(1) if err != nil { t.Fatal(err) } if unarchived { - t.Fatal("expected no unarchival") + t.Error("expected no unarchival") } expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{ @@ -831,16 +829,13 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) for fp, m := range b.fpToMetric { p.unindexMetric(fp, m) - unarchived, firstTime, err := p.unarchiveMetric(fp) + unarchived, err := p.unarchiveMetric(fp) if err != nil { t.Fatal(err) } if !unarchived { t.Errorf("%d. metric not unarchived", i) } - if firstTime != 1 { - t.Errorf("%d. expected firstTime=1, got %v", i, firstTime) - } delete(indexedFpsToMetrics, fp) } } diff --git a/storage/local/series.go b/storage/local/series.go index 60c428441..07388a2e5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -162,6 +162,9 @@ type memorySeries struct { // first chunk before its chunk desc is evicted. In doubt, this field is // just set to the oldest possible timestamp. savedFirstTime clientmodel.Timestamp + // The timestamp of the last sample in this series. Needed for fast access to + // ensure timestamp monotonicity during ingestion. + lastTime clientmodel.Timestamp // Whether the current head chunk has already been finished. If true, // the current head chunk must not be modified anymore. headChunkClosed bool @@ -175,28 +178,29 @@ type memorySeries struct { } // newMemorySeries returns a pointer to a newly allocated memorySeries for the -// given metric. reallyNew defines if the memorySeries is a genuinely new series -// or (if false) a series for a metric being unarchived, i.e. a series that -// existed before but has been evicted from memory. If reallyNew is false, -// firstTime is ignored (and set to the lowest possible timestamp instead - it -// will be set properly upon the first eviction of chunkDescs). -func newMemorySeries( - m clientmodel.Metric, - reallyNew bool, - firstTime clientmodel.Timestamp, -) *memorySeries { - if !reallyNew { - firstTime = clientmodel.Earliest +// given metric. chunkDescs and modTime in the new series are set according to +// the provided parameters. chunkDescs can be nil or empty if this is a +// genuinely new time series (i.e. not one that is being unarchived). In that +// case, headChunkClosed is set to false, and firstTime and lastTime are both +// set to clientmodel.Earliest. The zero value for modTime can be used if the +// modification time of the series file is unknown (e.g. if this is a genuinely +// new series). +func newMemorySeries(m clientmodel.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { + firstTime := clientmodel.Earliest + lastTime := clientmodel.Earliest + if len(chunkDescs) > 0 { + firstTime = chunkDescs[0].firstTime() + lastTime = chunkDescs[len(chunkDescs)-1].lastTime() } - s := memorySeries{ - metric: m, - headChunkClosed: !reallyNew, - savedFirstTime: firstTime, + return &memorySeries{ + metric: m, + chunkDescs: chunkDescs, + headChunkClosed: len(chunkDescs) > 0, + savedFirstTime: firstTime, + lastTime: lastTime, + persistWatermark: len(chunkDescs), + modTime: modTime, } - if !reallyNew { - s.chunkDescsOffset = -1 - } - return &s } // add adds a sample pair to the series. It returns the number of newly @@ -231,6 +235,8 @@ func (s *memorySeries) add(v *metric.SamplePair) int { for _, c := range chunks[1:] { s.chunkDescs = append(s.chunkDescs, newChunkDesc(c)) } + + s.lastTime = v.Timestamp return len(chunks) - 1 } @@ -243,7 +249,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { if s.headChunkClosed { return false } - if time.Now().Sub(s.head().lastTime().Time()) > headChunkTimeout { + if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { s.headChunkClosed = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. diff --git a/storage/local/storage.go b/storage/local/storage.go index 66b7aa961..90e1de21a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -135,6 +135,7 @@ type memorySeriesStorage struct { numSeries prometheus.Gauge seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter + outOfOrderSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter maintainSeriesDuration *prometheus.SummaryVec } @@ -203,6 +204,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "ingested_samples_total", Help: "The total number of samples ingested.", }), + outOfOrderSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "out_of_order_samples_total", + Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", + }), invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -550,6 +557,13 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { s.fpLocker.Lock(fp) } series := s.getOrCreateSeries(fp, sample.Metric) + + if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) + log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) + s.outOfOrderSamplesCount.Inc() + return + } completedChunksCount := series.add(&metric.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, @@ -562,18 +576,30 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { series, ok := s.fpToSeries.get(fp) if !ok { - unarchived, firstTime, err := s.persistence.unarchiveMetric(fp) + var cds []*chunkDesc + var modTime time.Time + unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { - log.Errorf("Error unarchiving fingerprint %v: %v", fp, err) + log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() + // We have to load chunkDescs anyway to do anything with + // the series, so let's do it right now so that we don't + // end up with a series without any chunkDescs for a + // while (which is confusing as it makes the series + // appear as archived or purged). + cds, err = s.loadChunkDescs(fp, clientmodel.Latest) + if err != nil { + log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) + } + modTime = s.persistence.seriesFileModTime(fp) } else { // This was a genuinely new series, so index the metric. s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived, firstTime) + series = newMemorySeries(m, cds, modTime) s.fpToSeries.put(fp, series) s.numSeries.Inc() } @@ -943,21 +969,8 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 { s.fpToSeries.del(fp) s.numSeries.Dec() - // Make sure we have a head chunk descriptor (a freshly - // unarchived series has none). - if len(series.chunkDescs) == 0 { - cds, err := s.loadChunkDescs(fp, clientmodel.Latest) - if err != nil { - log.Errorf( - "Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", - series.metric, err, - ) - return - } - series.chunkDescs = cds - } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), series.lastTime, ); err != nil { log.Errorf("Error archiving metric %v: %v", series.metric, err) return @@ -1154,6 +1167,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.numSeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() + ch <- s.outOfOrderSamplesCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) @@ -1178,6 +1192,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.numSeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount + ch <- s.outOfOrderSamplesCount ch <- s.invalidPreloadRequestsCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc, diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index da627ac74..e4dfd866e 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "reflect" "testing" "testing/quick" "time" @@ -377,7 +378,7 @@ func TestRetentionCutoff(t *testing.T) { s.WaitForIndexing() var fp clientmodel.Fingerprint - for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) { + for f := range s.fingerprintsForLabelPairs(metric.LabelPair{Name: "job", Value: "test"}) { fp = f break } @@ -398,7 +399,7 @@ func TestRetentionCutoff(t *testing.T) { t.Errorf("unexpected result for timestamp before retention period") } - vals = it.RangeValues(metric.Interval{insertStart, now}) + vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) // We get 59 values here because the clientmodel.Now() is slightly later // than our now. if len(vals) != 59 { @@ -408,7 +409,7 @@ func TestRetentionCutoff(t *testing.T) { t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) } - vals = it.BoundaryValues(metric.Interval{insertStart, now}) + vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) if len(vals) != 2 { t.Errorf("expected 2 values but got %d", len(vals)) } @@ -441,7 +442,7 @@ func TestDropMetrics(t *testing.T) { } s.WaitForIndexing() - fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps := s.fingerprintsForLabelPairs(metric.LabelPair{Name: clientmodel.MetricNameLabel, Value: "test"}) if len(fps) != 2 { t.Fatalf("unexpected number of fingerprints: %d", len(fps)) } @@ -449,7 +450,7 @@ func TestDropMetrics(t *testing.T) { var fpList clientmodel.Fingerprints for fp := range fps { it := s.NewIterator(fp) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Fatalf("unexpected number of samples: %d", len(vals)) } fpList = append(fpList, fp) @@ -458,34 +459,38 @@ func TestDropMetrics(t *testing.T) { s.DropMetricsForFingerprints(fpList[0]) s.WaitForIndexing() - fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{ + Name: clientmodel.MetricNameLabel, Value: "test", + }) if len(fps2) != 1 { t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) } it := s.NewIterator(fpList[0]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Fatalf("unexpected number of samples: %d", len(vals)) } s.DropMetricsForFingerprints(fpList...) s.WaitForIndexing() - fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{ + Name: clientmodel.MetricNameLabel, Value: "test", + }) if len(fps3) != 0 { t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) } it = s.NewIterator(fpList[0]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } } @@ -1405,3 +1410,50 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam } return result } + +func TestAppendOutOfOrder(t *testing.T) { + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + m := clientmodel.Metric{ + clientmodel.MetricNameLabel: "out_of_order", + } + + for _, t := range []int{0, 2, 2, 1} { + s.Append(&clientmodel.Sample{ + Metric: m, + Timestamp: clientmodel.Timestamp(t), + Value: clientmodel.SampleValue(t), + }) + } + + fp, err := s.mapper.mapFP(m.FastFingerprint(), m) + if err != nil { + t.Fatal(err) + } + + pl := s.NewPreloader() + defer pl.Close() + + err = pl.PreloadRange(fp, 0, 2, 5*time.Minute) + if err != nil { + t.Fatalf("error preloading chunks: %s", err) + } + + it := s.NewIterator(fp) + + want := metric.Values{ + { + Timestamp: 0, + Value: 0, + }, + { + Timestamp: 2, + Value: 2, + }, + } + got := it.RangeValues(metric.Interval{OldestInclusive: 0, NewestInclusive: 2}) + if !reflect.DeepEqual(want, got) { + t.Fatalf("want %v, got %v", want, got) + } +} From 502aa9ded553f2b59f55e26db08374c1cc1d25d8 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 16 Jul 2015 12:25:32 +0200 Subject: [PATCH 2/3] Use Has instead of Get for existence test. --- storage/local/index/leveldb.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index 37630b8b3..c4c46421c 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -110,12 +110,8 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) { return false, err } // Note that Delete returns nil if k does not exist. So we have to test - // for existence with Get first. - _, err = l.storage.Get(k, l.readOpts) - if err == leveldb.ErrNotFound { - return false, nil - } - if err != nil { + // for existence with Has first. + if has, err := l.storage.Has(k, l.readOpts); !has || err != nil { return false, err } if err = l.storage.Delete(k, l.writeOpts); err != nil { From 37e12df9ff773174fdff91ca148ae081f58f1889 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 16 Jul 2015 12:48:33 +0200 Subject: [PATCH 3/3] Improve TestAppendOutOfOrder --- storage/local/storage_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index e4dfd866e..813361051 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -1419,11 +1419,11 @@ func TestAppendOutOfOrder(t *testing.T) { clientmodel.MetricNameLabel: "out_of_order", } - for _, t := range []int{0, 2, 2, 1} { + for i, t := range []int{0, 2, 2, 1} { s.Append(&clientmodel.Sample{ Metric: m, Timestamp: clientmodel.Timestamp(t), - Value: clientmodel.SampleValue(t), + Value: clientmodel.SampleValue(i), }) } @@ -1449,7 +1449,7 @@ func TestAppendOutOfOrder(t *testing.T) { }, { Timestamp: 2, - Value: 2, + Value: 1, }, } got := it.RangeValues(metric.Interval{OldestInclusive: 0, NewestInclusive: 2})