diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d52bdfcb08..cbb0061bf4 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -250,17 +250,28 @@ func (p *persistence) sanitizeSeries( // consistent with the checkpoint, so we have to take a closer // look. if s.headChunkClosed { - // This is the easy case as we don't have any chunks in - // heads.db. Treat this series as a freshly unarchived - // one. No chunks or chunkDescs in memory, no current - // head chunk. + // This is the easy case as we have all chunks on + // disk. Treat this series as a freshly unarchived one + // by loading the chunkDescs and setting all parameters + // based on the loaded chunkDescs. + cds, err := p.loadChunkDescs(fp, clientmodel.Latest) + if err != nil { + log.Errorf( + "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } log.Warnf( "Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.", - s.metric, fp, chunksInFile, + s.metric, fp, len(cds), ) - s.chunkDescs = nil - s.chunkDescsOffset = chunksInFile - s.persistWatermark = 0 + s.chunkDescs = cds + s.chunkDescsOffset = 0 + s.savedFirstTime = cds[0].firstTime() + s.lastTime = cds[len(cds)-1].lastTime() + s.persistWatermark = len(cds) s.modTime = modTime return fp, true } @@ -275,8 +286,8 @@ func (p *persistence) sanitizeSeries( // First, throw away the chunkDescs without chunks. s.chunkDescs = s.chunkDescs[s.persistWatermark:] numMemChunkDescs.Sub(float64(s.persistWatermark)) - // Load all the chunk descs (which assumes we have none from the future). - cds, err := p.loadChunkDescs(fp, clientmodel.Now()) + // Load all the chunk descs. + cds, err := p.loadChunkDescs(fp, clientmodel.Latest) if err != nil { log.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", @@ -287,6 +298,7 @@ func (p *persistence) sanitizeSeries( } s.persistWatermark = len(cds) s.chunkDescsOffset = 0 + s.savedFirstTime = cds[0].firstTime() s.modTime = modTime lastTime := cds[len(cds)-1].lastTime() @@ -395,14 +407,11 @@ func (p *persistence) cleanUpArchiveIndexes( if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) - cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) + cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Latest) if err != nil { return err } - series.chunkDescs = cds - series.chunkDescsOffset = 0 - series.persistWatermark = len(cds) + series := newMemorySeries(clientmodel.Metric(m), cds, p.seriesFileModTime(clientmodel.Fingerprint(fp))) fpToSeries[clientmodel.Fingerprint(fp)] = series return nil }); err != nil { diff --git a/storage/local/delta.go b/storage/local/delta.go index ebb92e3cda..ebff38f15b 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -94,9 +94,11 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { } baseValue := c.baseValue() - // TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is - // fixed, we should panic here if dt is negative. dt := s.Timestamp - c.baseTime() + if dt < 0 { + panic("time delta is less than zero") + } + dv := s.Value - baseValue tb := c.timeBytes() vb := c.valueBytes() @@ -382,7 +384,7 @@ func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Times // Take absolute value for d8. return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) default: - panic("Invalid number of bytes for time delta") + panic("invalid number of bytes for time delta") } } @@ -407,7 +409,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - panic("Invalid number of bytes for integer delta") + panic("invalid number of bytes for integer delta") } } else { switch it.vBytes { @@ -417,7 +419,7 @@ func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.Sam // Take absolute value for d8. return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - panic("Invalid number of bytes for floating point delta") + panic("invalid number of bytes for floating point delta") } } } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index f11471aa52..7377ab2bd0 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -339,10 +339,7 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - // TODO(beorn7): We ignore this irregular case for now. Once - // https://github.com/prometheus/prometheus/issues/481 is - // fixed, we should panic here instead. - return []chunk{&c} + panic("base time delta is less than zero") } c = c[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { @@ -511,7 +508,7 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel // Take absolute value for d8. return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) default: - panic("Invalid number of bytes for time delta") + panic("invalid number of bytes for time delta") } } @@ -555,7 +552,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) // No d8 for ints. default: - panic("Invalid number of bytes for integer delta") + panic("invalid number of bytes for integer delta") } } else { switch it.vBytes { @@ -567,7 +564,7 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmod // Take absolute value for d8. return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) default: - panic("Invalid number of bytes for floating point delta") + panic("invalid number of bytes for floating point delta") } } } diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index 4a1345ab3a..37630b8b3e 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -109,13 +109,18 @@ func (l *LevelDB) Delete(key encoding.BinaryMarshaler) (bool, error) { if err != nil { return false, err } - err = l.storage.Delete(k, l.writeOpts) + // Note that Delete returns nil if k does not exist. So we have to test + // for existence with Get first. + _, err = l.storage.Get(k, l.readOpts) if err == leveldb.ErrNotFound { return false, nil } if err != nil { return false, err } + if err = l.storage.Delete(k, l.writeOpts); err != nil { + return false, err + } return true, nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2e7cdb0563..a629e23f43 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -643,7 +643,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } else { - // This is the non-persisted head chunk. Fully marshal it. + // This is a non-persisted chunk. Fully marshal it. if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { return } @@ -853,6 +853,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), + lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), headChunkClosed: persistWatermark >= numChunkDescs, } } @@ -1173,38 +1174,27 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error // unarchiveMetric deletes an archived fingerprint and its metric, but (in // contrast to purgeArchivedMetric) does not un-index the metric. If a metric -// was actually deleted, the method returns true and the first time of the -// deleted metric. The caller must have locked the fingerprint. -func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) ( - deletedAnything bool, - firstDeletedTime clientmodel.Timestamp, - err error, -) { +// was actually deleted, the method returns true and the first time and last +// time of the deleted metric. The caller must have locked the fingerprint. +func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (deletedAnything bool, err error) { defer func() { if err != nil { p.setDirty(true) } }() - firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp) - if err != nil || !has { - return false, firstTime, err - } deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) - if err != nil { - return false, firstTime, err - } - if !deleted { - log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp) + if err != nil || !deleted { + return false, err } deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)) if err != nil { - return false, firstTime, err + return false, err } if !deleted { log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp) } - return true, firstTime, nil + return true, nil } // close flushes the indexing queue and other buffered data and releases any diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 1954823a01..34bb672b39 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -17,6 +17,7 @@ import ( "reflect" "sync" "testing" + "time" clientmodel "github.com/prometheus/client_golang/model" @@ -354,11 +355,11 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true, 0) - s2 := newMemorySeries(m2, false, 0) - s3 := newMemorySeries(m3, false, 0) - s4 := newMemorySeries(m4, true, 0) - s5 := newMemorySeries(m5, true, 0) + s1 := newMemorySeries(m1, nil, time.Time{}) + s2 := newMemorySeries(m2, nil, time.Time{}) + s3 := newMemorySeries(m3, nil, time.Time{}) + s4 := newMemorySeries(m4, nil, time.Time{}) + s5 := newMemorySeries(m5, nil, time.Time{}) s1.add(&metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(&metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true @@ -416,8 +417,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS3.head().c != nil { t.Error("head chunk not evicted") } - if loadedS3.chunkDescsOffset != -1 { - t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset) + if loadedS3.chunkDescsOffset != 0 { + t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset) } if !loadedS3.headChunkClosed { t.Error("headChunkClosed is false") @@ -546,22 +547,19 @@ func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { } } - unarchived, firstTime, err := p.unarchiveMetric(1) + unarchived, err := p.unarchiveMetric(1) if err != nil { t.Fatal(err) } if !unarchived { - t.Fatal("expected actual unarchival") + t.Error("expected actual unarchival") } - if firstTime != 2 { - t.Errorf("expected first time 2, got %v", firstTime) - } - unarchived, firstTime, err = p.unarchiveMetric(1) + unarchived, err = p.unarchiveMetric(1) if err != nil { t.Fatal(err) } if unarchived { - t.Fatal("expected no unarchival") + t.Error("expected no unarchival") } expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{ @@ -831,16 +829,13 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) for fp, m := range b.fpToMetric { p.unindexMetric(fp, m) - unarchived, firstTime, err := p.unarchiveMetric(fp) + unarchived, err := p.unarchiveMetric(fp) if err != nil { t.Fatal(err) } if !unarchived { t.Errorf("%d. metric not unarchived", i) } - if firstTime != 1 { - t.Errorf("%d. expected firstTime=1, got %v", i, firstTime) - } delete(indexedFpsToMetrics, fp) } } diff --git a/storage/local/series.go b/storage/local/series.go index 60c428441f..07388a2e5f 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -162,6 +162,9 @@ type memorySeries struct { // first chunk before its chunk desc is evicted. In doubt, this field is // just set to the oldest possible timestamp. savedFirstTime clientmodel.Timestamp + // The timestamp of the last sample in this series. Needed for fast access to + // ensure timestamp monotonicity during ingestion. + lastTime clientmodel.Timestamp // Whether the current head chunk has already been finished. If true, // the current head chunk must not be modified anymore. headChunkClosed bool @@ -175,28 +178,29 @@ type memorySeries struct { } // newMemorySeries returns a pointer to a newly allocated memorySeries for the -// given metric. reallyNew defines if the memorySeries is a genuinely new series -// or (if false) a series for a metric being unarchived, i.e. a series that -// existed before but has been evicted from memory. If reallyNew is false, -// firstTime is ignored (and set to the lowest possible timestamp instead - it -// will be set properly upon the first eviction of chunkDescs). -func newMemorySeries( - m clientmodel.Metric, - reallyNew bool, - firstTime clientmodel.Timestamp, -) *memorySeries { - if !reallyNew { - firstTime = clientmodel.Earliest +// given metric. chunkDescs and modTime in the new series are set according to +// the provided parameters. chunkDescs can be nil or empty if this is a +// genuinely new time series (i.e. not one that is being unarchived). In that +// case, headChunkClosed is set to false, and firstTime and lastTime are both +// set to clientmodel.Earliest. The zero value for modTime can be used if the +// modification time of the series file is unknown (e.g. if this is a genuinely +// new series). +func newMemorySeries(m clientmodel.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { + firstTime := clientmodel.Earliest + lastTime := clientmodel.Earliest + if len(chunkDescs) > 0 { + firstTime = chunkDescs[0].firstTime() + lastTime = chunkDescs[len(chunkDescs)-1].lastTime() } - s := memorySeries{ - metric: m, - headChunkClosed: !reallyNew, - savedFirstTime: firstTime, + return &memorySeries{ + metric: m, + chunkDescs: chunkDescs, + headChunkClosed: len(chunkDescs) > 0, + savedFirstTime: firstTime, + lastTime: lastTime, + persistWatermark: len(chunkDescs), + modTime: modTime, } - if !reallyNew { - s.chunkDescsOffset = -1 - } - return &s } // add adds a sample pair to the series. It returns the number of newly @@ -231,6 +235,8 @@ func (s *memorySeries) add(v *metric.SamplePair) int { for _, c := range chunks[1:] { s.chunkDescs = append(s.chunkDescs, newChunkDesc(c)) } + + s.lastTime = v.Timestamp return len(chunks) - 1 } @@ -243,7 +249,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { if s.headChunkClosed { return false } - if time.Now().Sub(s.head().lastTime().Time()) > headChunkTimeout { + if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { s.headChunkClosed = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. diff --git a/storage/local/storage.go b/storage/local/storage.go index 66b7aa961b..90e1de21a4 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -135,6 +135,7 @@ type memorySeriesStorage struct { numSeries prometheus.Gauge seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter + outOfOrderSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter maintainSeriesDuration *prometheus.SummaryVec } @@ -203,6 +204,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { Name: "ingested_samples_total", Help: "The total number of samples ingested.", }), + outOfOrderSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "out_of_order_samples_total", + Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", + }), invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -550,6 +557,13 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { s.fpLocker.Lock(fp) } series := s.getOrCreateSeries(fp, sample.Metric) + + if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) + log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) + s.outOfOrderSamplesCount.Inc() + return + } completedChunksCount := series.add(&metric.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, @@ -562,18 +576,30 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { series, ok := s.fpToSeries.get(fp) if !ok { - unarchived, firstTime, err := s.persistence.unarchiveMetric(fp) + var cds []*chunkDesc + var modTime time.Time + unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { - log.Errorf("Error unarchiving fingerprint %v: %v", fp, err) + log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() + // We have to load chunkDescs anyway to do anything with + // the series, so let's do it right now so that we don't + // end up with a series without any chunkDescs for a + // while (which is confusing as it makes the series + // appear as archived or purged). + cds, err = s.loadChunkDescs(fp, clientmodel.Latest) + if err != nil { + log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) + } + modTime = s.persistence.seriesFileModTime(fp) } else { // This was a genuinely new series, so index the metric. s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived, firstTime) + series = newMemorySeries(m, cds, modTime) s.fpToSeries.put(fp, series) s.numSeries.Inc() } @@ -943,21 +969,8 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 { s.fpToSeries.del(fp) s.numSeries.Dec() - // Make sure we have a head chunk descriptor (a freshly - // unarchived series has none). - if len(series.chunkDescs) == 0 { - cds, err := s.loadChunkDescs(fp, clientmodel.Latest) - if err != nil { - log.Errorf( - "Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", - series.metric, err, - ) - return - } - series.chunkDescs = cds - } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), series.lastTime, ); err != nil { log.Errorf("Error archiving metric %v: %v", series.metric, err) return @@ -1154,6 +1167,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.numSeries.Desc() s.seriesOps.Describe(ch) ch <- s.ingestedSamplesCount.Desc() + ch <- s.outOfOrderSamplesCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) @@ -1178,6 +1192,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.numSeries s.seriesOps.Collect(ch) ch <- s.ingestedSamplesCount + ch <- s.outOfOrderSamplesCount ch <- s.invalidPreloadRequestsCount ch <- prometheus.MustNewConstMetric( numMemChunksDesc, diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index da627ac742..e4dfd866e2 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "reflect" "testing" "testing/quick" "time" @@ -377,7 +378,7 @@ func TestRetentionCutoff(t *testing.T) { s.WaitForIndexing() var fp clientmodel.Fingerprint - for f := range s.fingerprintsForLabelPairs(metric.LabelPair{"job", "test"}) { + for f := range s.fingerprintsForLabelPairs(metric.LabelPair{Name: "job", Value: "test"}) { fp = f break } @@ -398,7 +399,7 @@ func TestRetentionCutoff(t *testing.T) { t.Errorf("unexpected result for timestamp before retention period") } - vals = it.RangeValues(metric.Interval{insertStart, now}) + vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) // We get 59 values here because the clientmodel.Now() is slightly later // than our now. if len(vals) != 59 { @@ -408,7 +409,7 @@ func TestRetentionCutoff(t *testing.T) { t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) } - vals = it.BoundaryValues(metric.Interval{insertStart, now}) + vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) if len(vals) != 2 { t.Errorf("expected 2 values but got %d", len(vals)) } @@ -441,7 +442,7 @@ func TestDropMetrics(t *testing.T) { } s.WaitForIndexing() - fps := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps := s.fingerprintsForLabelPairs(metric.LabelPair{Name: clientmodel.MetricNameLabel, Value: "test"}) if len(fps) != 2 { t.Fatalf("unexpected number of fingerprints: %d", len(fps)) } @@ -449,7 +450,7 @@ func TestDropMetrics(t *testing.T) { var fpList clientmodel.Fingerprints for fp := range fps { it := s.NewIterator(fp) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Fatalf("unexpected number of samples: %d", len(vals)) } fpList = append(fpList, fp) @@ -458,34 +459,38 @@ func TestDropMetrics(t *testing.T) { s.DropMetricsForFingerprints(fpList[0]) s.WaitForIndexing() - fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps2 := s.fingerprintsForLabelPairs(metric.LabelPair{ + Name: clientmodel.MetricNameLabel, Value: "test", + }) if len(fps2) != 1 { t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) } it := s.NewIterator(fpList[0]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Fatalf("unexpected number of samples: %d", len(vals)) } s.DropMetricsForFingerprints(fpList...) s.WaitForIndexing() - fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{clientmodel.MetricNameLabel, "test"}) + fps3 := s.fingerprintsForLabelPairs(metric.LabelPair{ + Name: clientmodel.MetricNameLabel, Value: "test", + }) if len(fps3) != 0 { t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) } it = s.NewIterator(fpList[0]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) - if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Fatalf("unexpected number of samples: %d", len(vals)) } } @@ -1405,3 +1410,50 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam } return result } + +func TestAppendOutOfOrder(t *testing.T) { + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + m := clientmodel.Metric{ + clientmodel.MetricNameLabel: "out_of_order", + } + + for _, t := range []int{0, 2, 2, 1} { + s.Append(&clientmodel.Sample{ + Metric: m, + Timestamp: clientmodel.Timestamp(t), + Value: clientmodel.SampleValue(t), + }) + } + + fp, err := s.mapper.mapFP(m.FastFingerprint(), m) + if err != nil { + t.Fatal(err) + } + + pl := s.NewPreloader() + defer pl.Close() + + err = pl.PreloadRange(fp, 0, 2, 5*time.Minute) + if err != nil { + t.Fatalf("error preloading chunks: %s", err) + } + + it := s.NewIterator(fp) + + want := metric.Values{ + { + Timestamp: 0, + Value: 0, + }, + { + Timestamp: 2, + Value: 2, + }, + } + got := it.RangeValues(metric.Interval{OldestInclusive: 0, NewestInclusive: 2}) + if !reflect.DeepEqual(want, got) { + t.Fatalf("want %v, got %v", want, got) + } +}