From 5f8e9617efed3c1b5a711e07e971b1df2b418bfa Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 28 Oct 2014 19:01:41 +0100 Subject: [PATCH] Add more tests. Add an end-to-end fuzz and race test. Fix a race exposed by the above. Change-Id: Ifaa39a90cefbde8d4c29bda197cc92592ded21bb --- storage/local/codable/codable_test.go | 214 +++++++++--------- storage/local/persistence.go | 14 +- storage/local/persistence_test.go | 293 +++++++++++++++++++++++-- storage/local/series.go | 43 ++-- storage/local/storage.go | 10 +- storage/local/storage_test.go | 305 ++++++++++++++++++++++---- 6 files changed, 692 insertions(+), 187 deletions(-) diff --git a/storage/local/codable/codable_test.go b/storage/local/codable/codable_test.go index 710cbf9be..14ee8c15a 100644 --- a/storage/local/codable/codable_test.go +++ b/storage/local/codable/codable_test.go @@ -14,6 +14,7 @@ package codable import ( + "bytes" "encoding" "reflect" "testing" @@ -29,107 +30,122 @@ func newLabelName(ln string) *LabelName { return &cln } -func TestCodec(t *testing.T) { - scenarios := []struct { - in encoding.BinaryMarshaler - out encoding.BinaryUnmarshaler - equal func(in, out interface{}) bool - }{ - { - in: &Metric{ - "label_1": "value_2", - "label_2": "value_2", - "label_3": "value_3", - }, - out: &Metric{}, - }, { - in: newFingerprint(12345), - out: newFingerprint(0), - }, { - in: &Fingerprints{1, 2, 56, 1234}, - out: &Fingerprints{}, - }, { - in: &Fingerprints{1, 2, 56, 1234}, - out: &FingerprintSet{}, - equal: func(in, out interface{}) bool { - inSet := FingerprintSet{} - for _, fp := range *(in.(*Fingerprints)) { - inSet[fp] = struct{}{} - } - return reflect.DeepEqual(inSet, *(out.(*FingerprintSet))) - }, - }, { - in: &FingerprintSet{ - 1: struct{}{}, - 2: struct{}{}, - 56: struct{}{}, - 1234: struct{}{}, - }, - out: &FingerprintSet{}, - }, { - in: &FingerprintSet{ - 1: struct{}{}, - 2: struct{}{}, - 56: struct{}{}, - 1234: struct{}{}, - }, - out: &Fingerprints{}, - equal: func(in, out interface{}) bool { - outSet := FingerprintSet{} - for _, fp := range *(out.(*Fingerprints)) { - outSet[fp] = struct{}{} - } - return reflect.DeepEqual(outSet, *(in.(*FingerprintSet))) - }, - }, { - in: &LabelPair{ - Name: "label_name", - Value: "label_value", - }, - out: &LabelPair{}, - }, { - in: newLabelName("label_name"), - out: newLabelName(""), - }, { - in: &LabelValues{"value_1", "value_2", "value_3"}, - out: &LabelValues{}, - }, { - in: &LabelValues{"value_1", "value_2", "value_3"}, - out: &LabelValueSet{}, - equal: func(in, out interface{}) bool { - inSet := LabelValueSet{} - for _, lv := range *(in.(*LabelValues)) { - inSet[lv] = struct{}{} - } - return reflect.DeepEqual(inSet, *(out.(*LabelValueSet))) - }, - }, { - in: &LabelValueSet{ - "value_1": struct{}{}, - "value_2": struct{}{}, - "value_3": struct{}{}, - }, - out: &LabelValueSet{}, - }, { - in: &LabelValueSet{ - "value_1": struct{}{}, - "value_2": struct{}{}, - "value_3": struct{}{}, - }, - out: &LabelValues{}, - equal: func(in, out interface{}) bool { - outSet := LabelValueSet{} - for _, lv := range *(out.(*LabelValues)) { - outSet[lv] = struct{}{} - } - return reflect.DeepEqual(outSet, *(in.(*LabelValueSet))) - }, - }, { - in: &TimeRange{42, 2001}, - out: &TimeRange{}, - }, +func TestUint64(t *testing.T) { + var b bytes.Buffer + const n = 422010471112345 + if err := EncodeUint64(&b, n); err != nil { + t.Fatal(err) } + got, err := DecodeUint64(&b) + if err != nil { + t.Fatal(err) + } + if got != n { + t.Errorf("want %d, got %d", n, got) + } +} +var scenarios = []struct { + in encoding.BinaryMarshaler + out encoding.BinaryUnmarshaler + equal func(in, out interface{}) bool +}{ + { + in: &Metric{ + "label_1": "value_2", + "label_2": "value_2", + "label_3": "value_3", + }, + out: &Metric{}, + }, { + in: newFingerprint(12345), + out: newFingerprint(0), + }, { + in: &Fingerprints{1, 2, 56, 1234}, + out: &Fingerprints{}, + }, { + in: &Fingerprints{1, 2, 56, 1234}, + out: &FingerprintSet{}, + equal: func(in, out interface{}) bool { + inSet := FingerprintSet{} + for _, fp := range *(in.(*Fingerprints)) { + inSet[fp] = struct{}{} + } + return reflect.DeepEqual(inSet, *(out.(*FingerprintSet))) + }, + }, { + in: &FingerprintSet{ + 1: struct{}{}, + 2: struct{}{}, + 56: struct{}{}, + 1234: struct{}{}, + }, + out: &FingerprintSet{}, + }, { + in: &FingerprintSet{ + 1: struct{}{}, + 2: struct{}{}, + 56: struct{}{}, + 1234: struct{}{}, + }, + out: &Fingerprints{}, + equal: func(in, out interface{}) bool { + outSet := FingerprintSet{} + for _, fp := range *(out.(*Fingerprints)) { + outSet[fp] = struct{}{} + } + return reflect.DeepEqual(outSet, *(in.(*FingerprintSet))) + }, + }, { + in: &LabelPair{ + Name: "label_name", + Value: "label_value", + }, + out: &LabelPair{}, + }, { + in: newLabelName("label_name"), + out: newLabelName(""), + }, { + in: &LabelValues{"value_1", "value_2", "value_3"}, + out: &LabelValues{}, + }, { + in: &LabelValues{"value_1", "value_2", "value_3"}, + out: &LabelValueSet{}, + equal: func(in, out interface{}) bool { + inSet := LabelValueSet{} + for _, lv := range *(in.(*LabelValues)) { + inSet[lv] = struct{}{} + } + return reflect.DeepEqual(inSet, *(out.(*LabelValueSet))) + }, + }, { + in: &LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + "value_3": struct{}{}, + }, + out: &LabelValueSet{}, + }, { + in: &LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + "value_3": struct{}{}, + }, + out: &LabelValues{}, + equal: func(in, out interface{}) bool { + outSet := LabelValueSet{} + for _, lv := range *(out.(*LabelValues)) { + outSet[lv] = struct{}{} + } + return reflect.DeepEqual(outSet, *(in.(*LabelValueSet))) + }, + }, { + in: &TimeRange{42, 2001}, + out: &TimeRange{}, + }, +} + +func TestCodec(t *testing.T) { for i, s := range scenarios { encoded, err := s.in.MarshalBinary() if err != nil { diff --git a/storage/local/persistence.go b/storage/local/persistence.go index b7a56520d..6f6232655 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -86,8 +86,8 @@ type persistence struct { basePath string chunkLen int - // archiveMtx protects the archiving-related methods ArchiveMetric, - // UnarchiveMetric, DropArchiveMetric, and GetFingerprintsModifiedBefore + // archiveMtx protects the archiving-related methods archiveMetric, + // unarchiveMetric, dropArchiveMetric, and getFingerprintsModifiedBefore // from concurrent calls. archiveMtx sync.Mutex @@ -360,7 +360,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } - if cd.chunkLastTime.After(beforeTime) { + if !cd.chunkLastTime.Before(beforeTime) { // From here on, we have chunkDescs in memory already. break } @@ -373,7 +373,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie // checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping // and all open (non-full) head chunks. Do not call concurrently with -// LoadSeriesMapAndHeads. +// loadSeriesMapAndHeads. func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { glog.Info("Checkpointing in-memory metrics and head chunks...") begin := time.Now() @@ -666,7 +666,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsModifiedBefore. If the queue is full, this method blocks // until the metric can be queued. This method is goroutine-safe. -func (p *persistence) indexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { +func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { p.indexingQueue <- indexingOp{fp, m, add} } @@ -677,7 +677,7 @@ func (p *persistence) indexMetric(m clientmodel.Metric, fp clientmodel.Fingerpri // archived metric. To drop an archived metric, call dropArchivedFingerprint.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. -func (p *persistence) unindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { +func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { p.indexingQueue <- indexingOp{fp, m, remove} } @@ -775,7 +775,7 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error { if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return err } - p.unindexMetric(metric, fp) + p.unindexMetric(fp, metric) return nil } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 52bfb1d86..262c35bd8 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -25,6 +25,12 @@ import ( "github.com/prometheus/prometheus/utility/test" ) +var ( + m1 = clientmodel.Metric{"label": "value1"} + m2 = clientmodel.Metric{"label": "value2"} + m3 = clientmodel.Metric{"label": "value3"} +) + func newTestPersistence(t *testing.T) (*persistence, test.Closer) { dir := test.NewTemporaryDirectory("test_persistence", t) p, err := newPersistence(dir.Path(), 1024) @@ -40,15 +46,9 @@ func newTestPersistence(t *testing.T) (*persistence, test.Closer) { func buildTestChunks() map[clientmodel.Fingerprint][]chunk { fps := clientmodel.Fingerprints{ - clientmodel.Metric{ - "label": "value1", - }.Fingerprint(), - clientmodel.Metric{ - "label": "value2", - }.Fingerprint(), - clientmodel.Metric{ - "label": "value3", - }.Fingerprint(), + m1.Fingerprint(), + m2.Fingerprint(), + m3.Fingerprint(), } fpToChunks := map[clientmodel.Fingerprint][]chunk{} @@ -75,7 +75,7 @@ func chunksEqual(c1, c2 chunk) bool { return true } -func TestPersistChunk(t *testing.T) { +func TestPersistLoadDropChunks(t *testing.T) { p, closer := newTestPersistence(t) defer closer.Close() @@ -104,9 +104,272 @@ func TestPersistChunk(t *testing.T) { } for _, i := range indexes { if !chunksEqual(expectedChunks[i], actualChunks[i]) { - t.Fatalf("%d. Chunks not equal.", i) + t.Errorf("%d. Chunks not equal.", i) } } + // Load all chunk descs. + actualChunkDescs, err := p.loadChunkDescs(fp, 10) + if len(actualChunkDescs) != 10 { + t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) + } + for i, cd := range actualChunkDescs { + if cd.firstTime() != clientmodel.Timestamp(i) || cd.lastTime() != clientmodel.Timestamp(i) { + t.Errorf( + "Want ts=%v, got firstTime=%v, lastTime=%v.", + i, cd.firstTime(), cd.lastTime(), + ) + } + + } + // Load chunk descs partially. + actualChunkDescs, err = p.loadChunkDescs(fp, 5) + if len(actualChunkDescs) != 5 { + t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) + } + for i, cd := range actualChunkDescs { + if cd.firstTime() != clientmodel.Timestamp(i) || cd.lastTime() != clientmodel.Timestamp(i) { + t.Errorf( + "Want ts=%v, got firstTime=%v, lastTime=%v.", + i, cd.firstTime(), cd.lastTime(), + ) + } + + } + } + // Drop half of the chunks. + for fp, expectedChunks := range fpToChunks { + numDropped, allDropped, err := p.dropChunks(fp, 5) + if err != nil { + t.Fatal(err) + } + if numDropped != 5 { + t.Errorf("want 5 dropped chunks, got %v", numDropped) + } + if allDropped { + t.Error("all chunks dropped") + } + indexes := make([]int, 5) + for i := range indexes { + indexes[i] = i + } + actualChunks, err := p.loadChunks(fp, indexes, 0) + if err != nil { + t.Fatal(err) + } + for _, i := range indexes { + if !chunksEqual(expectedChunks[i+5], actualChunks[i]) { + t.Errorf("%d. Chunks not equal.", i) + } + } + } + // Drop all the chunks. + for fp := range fpToChunks { + numDropped, allDropped, err := p.dropChunks(fp, 100) + if err != nil { + t.Fatal(err) + } + if numDropped != 5 { + t.Errorf("want 5 dropped chunks, got %v", numDropped) + } + if !allDropped { + t.Error("not all chunks dropped") + } + } +} + +func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) { + p, closer := newTestPersistence(t) + defer closer.Close() + + fpLocker := newFingerprintLocker(10) + sm := newSeriesMap() + s1 := newMemorySeries(m1, true) + s2 := newMemorySeries(m2, false) + s3 := newMemorySeries(m3, false) + s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) + s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) + s3.headChunkPersisted = true + sm.put(m1.Fingerprint(), s1) + sm.put(m2.Fingerprint(), s2) + sm.put(m3.Fingerprint(), s3) + + if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil { + t.Fatal(err) + } + + loadedSM, err := p.loadSeriesMapAndHeads() + if err != nil { + t.Fatal(err) + } + if loadedSM.length() != 2 { + t.Errorf("want 2 series in map, got %d", loadedSM.length()) + } + if loadedS1, ok := loadedSM.get(m1.Fingerprint()); ok { + if !reflect.DeepEqual(loadedS1.metric, m1) { + t.Errorf("want metric %v, got %v", m1, loadedS1.metric) + } + if !reflect.DeepEqual(loadedS1.head().chunk, s1.head().chunk) { + t.Error("head chunks differ") + } + if loadedS1.chunkDescsOffset != 0 { + t.Errorf("want chunkDescsOffset 0, got %d", loadedS1.chunkDescsOffset) + } + if loadedS1.headChunkPersisted { + t.Error("headChunkPersisted is true") + } + } else { + t.Errorf("couldn't find %v in loaded map", m1) + } + if loadedS3, ok := loadedSM.get(m3.Fingerprint()); ok { + if !reflect.DeepEqual(loadedS3.metric, m3) { + t.Errorf("want metric %v, got %v", m3, loadedS3.metric) + } + if loadedS3.head().chunk != nil { + t.Error("head chunk not evicted") + } + if loadedS3.chunkDescsOffset != -1 { + t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset) + } + if !loadedS3.headChunkPersisted { + t.Error("headChunkPersisted is false") + } + } else { + t.Errorf("couldn't find %v in loaded map", m1) + } +} + +func TestGetFingerprintsModifiedBefore(t *testing.T) { + p, closer := newTestPersistence(t) + defer closer.Close() + + m1 := clientmodel.Metric{"n1": "v1"} + m2 := clientmodel.Metric{"n2": "v2"} + m3 := clientmodel.Metric{"n1": "v2"} + p.archiveMetric(1, m1, 2, 4) + p.archiveMetric(2, m2, 1, 6) + p.archiveMetric(3, m3, 5, 5) + + expectedFPs := map[clientmodel.Timestamp][]clientmodel.Fingerprint{ + 0: {}, + 1: {}, + 2: {2}, + 3: {1, 2}, + 4: {1, 2}, + 5: {1, 2}, + 6: {1, 2, 3}, + } + + for ts, want := range expectedFPs { + got, err := p.getFingerprintsModifiedBefore(ts) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(want, got) { + t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got) + } + } + + unarchived, err := p.unarchiveMetric(1) + if err != nil { + t.Fatal(err) + } + if !unarchived { + t.Fatal("expected actual unarchival") + } + unarchived, err = p.unarchiveMetric(1) + if err != nil { + t.Fatal(err) + } + if unarchived { + t.Fatal("expected no unarchival") + } + + expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{ + 0: {}, + 1: {}, + 2: {2}, + 3: {2}, + 4: {2}, + 5: {2}, + 6: {2, 3}, + } + + for ts, want := range expectedFPs { + got, err := p.getFingerprintsModifiedBefore(ts) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(want, got) { + t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got) + } + } +} + +func TestDropArchivedMetric(t *testing.T) { + p, closer := newTestPersistence(t) + defer closer.Close() + + m1 := clientmodel.Metric{"n1": "v1"} + m2 := clientmodel.Metric{"n2": "v2"} + p.archiveMetric(1, m1, 2, 4) + p.archiveMetric(2, m2, 1, 6) + p.indexMetric(1, m1) + p.indexMetric(2, m2) + p.waitForIndexing() + + outFPs, err := p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) + if err != nil { + t.Fatal(err) + } + want := clientmodel.Fingerprints{1} + if !reflect.DeepEqual(outFPs, want) { + t.Errorf("want %#v, got %#v", want, outFPs) + } + outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) + if err != nil { + t.Fatal(err) + } + want = clientmodel.Fingerprints{2} + if !reflect.DeepEqual(outFPs, want) { + t.Errorf("want %#v, got %#v", want, outFPs) + } + if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + t.Error("want FP 1 archived") + } + if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + t.Error("want FP 2 archived") + } + + if err != p.dropArchivedMetric(1) { + t.Fatal(err) + } + if err != p.dropArchivedMetric(3) { + // Dropping something that has not beet archived is not an error. + t.Fatal(err) + } + p.waitForIndexing() + + outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) + if err != nil { + t.Fatal(err) + } + want = nil + if !reflect.DeepEqual(outFPs, want) { + t.Errorf("want %#v, got %#v", want, outFPs) + } + outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) + if err != nil { + t.Fatal(err) + } + want = clientmodel.Fingerprints{2} + if !reflect.DeepEqual(outFPs, want) { + t.Errorf("want %#v, got %#v", want, outFPs) + } + if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + t.Error("want FP 1 not archived") + } + if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + t.Error("want FP 2 archived") } } @@ -258,7 +521,7 @@ func TestIndexing(t *testing.T) { indexedFpsToMetrics := index.FingerprintMetricMapping{} for i, b := range batches { for fp, m := range b.fpToMetric { - p.indexMetric(m, fp) + p.indexMetric(fp, m) if err := p.archiveMetric(fp, m, 1, 2); err != nil { t.Fatal(err) } @@ -271,7 +534,7 @@ func TestIndexing(t *testing.T) { b := batches[i] verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) for fp, m := range b.fpToMetric { - p.unindexMetric(m, fp) + p.unindexMetric(fp, m) unarchived, err := p.unarchiveMetric(fp) if err != nil { t.Fatal(err) @@ -331,13 +594,13 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFps, err := p.getFingerprintsForLabelPair(lp) + outFPs, err := p.getFingerprintsForLabelPair(lp) if err != nil { t.Fatal(err) } outSet := codable.FingerprintSet{} - for _, fp := range outFps { + for _, fp := range outFPs { outSet[fp] = struct{}{} } diff --git a/storage/local/series.go b/storage/local/series.go index a7b47e5ad..4edcb24bc 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -226,21 +226,22 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []* // series, even if chunks in between were evicted.) // // Special considerations for the head chunk: If it has not been scheduled to be -// persisted yet but is old enough for eviction, this method returns -// persistHeadChunk as true. The caller is then responsible for persisting the -// head chunk. The internal state of this memorySeries is already set -// accordingly by this method. Calling evictOlderThan for a series with a -// non-persisted head chunk that is old enough for eviction will never evict all -// chunks immediately, even if no chunk is pinned for other reasons, because the -// head chunk is not persisted yet. A series old enough for archiving will -// require at least two eviction runs to become ready for archiving: In the -// first run, its head chunk is requested to be persisted. The next call of -// evictOlderThan will then return true, provided that the series hasn't -// received new samples in the meantime, the head chunk has now been persisted, -// and no chunk is pinned for other reasons. +// persisted yet but is old enough for eviction, this method returns a pointer +// to the descriptor of the head chunk to be persisted. (Otherwise, the method +// returns nil.) The caller is then responsible for persisting the head +// chunk. The internal state of this memorySeries is already set accordingly by +// this method. Calling evictOlderThan for a series with a non-persisted head +// chunk that is old enough for eviction will never evict all chunks +// immediately, even if no chunk is pinned for other reasons, because the head +// chunk is not persisted yet. A series old enough for archiving will require at +// least two eviction runs to become ready for archiving: In the first run, its +// head chunk is requested to be persisted. The next call of evictOlderThan will +// then return true, provided that the series hasn't received new samples in the +// meantime, the head chunk has now been persisted, and no chunk is pinned for +// other reasons. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, persistHeadChunk bool) { +func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, headChunkToPersist *chunkDesc) { allEvicted = true iOldestNotEvicted := -1 @@ -269,7 +270,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, if iOldestNotEvicted == -1 { iOldestNotEvicted = i } - return false, false + return false, nil } if cd.isEvicted() { continue @@ -277,7 +278,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, if !s.headChunkPersisted && i == len(s.chunkDescs)-1 { // This is a non-persisted head chunk that is old enough // for eviction. Request it to be persisted: - persistHeadChunk = true + headChunkToPersist = cd s.headChunkPersisted = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. @@ -290,7 +291,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, allEvicted = false } } - return allEvicted, persistHeadChunk + return allEvicted, headChunkToPersist } // purgeOlderThan removes chunkDescs older than t. It also evicts the chunks of @@ -448,10 +449,14 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { } } +// head returns a pointer to the head chunk descriptor. The caller must have +// locked the fingerprint of the memorySeries. func (s *memorySeries) head() *chunkDesc { return s.chunkDescs[len(s.chunkDescs)-1] } +// values returns all values in the series. The caller must have locked the +// fingerprint of the memorySeries. func (s *memorySeries) values() metric.Values { var values metric.Values for _, cd := range s.chunkDescs { @@ -462,10 +467,14 @@ func (s *memorySeries) values() metric.Values { return values } +// firstTime returns the timestamp of the first sample in the series. The caller +// must have locked the fingerprint of the memorySeries. func (s *memorySeries) firstTime() clientmodel.Timestamp { return s.chunkDescs[0].firstTime() } +// lastTime returns the timestamp of the last sample in the series. The caller +// must have locked the fingerprint of the memorySeries. func (s *memorySeries) lastTime() clientmodel.Timestamp { return s.head().lastTime() } @@ -539,7 +548,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val var chunkIt chunkIterator if c.firstTime().After(in.NewestInclusive) { if len(values) == 1 { - // We found the first value already, but are now + // We found the first value before, but are now // already past the last value. The value we // want must be the last value of the previous // chunk. So backtrack... diff --git a/storage/local/storage.go b/storage/local/storage.go index cd252dc11..a970486af 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -354,7 +354,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.seriesOps.WithLabelValues(unarchive).Inc() } else { // This was a genuinely new series, so index the metric. - s.persistence.indexMetric(m, fp) + s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } series = newMemorySeries(m, !unarchived) @@ -464,7 +464,7 @@ func (s *memorySeriesStorage) loop() { // Keep going. } s.fpLocker.Lock(m.fp) - allEvicted, persistHeadChunk := m.series.evictOlderThan( + allEvicted, headChunkToPersist := m.series.evictOlderThan( clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.evictAfter), ) if allEvicted { @@ -480,8 +480,8 @@ func (s *memorySeriesStorage) loop() { } s.fpLocker.Unlock(m.fp) // Queue outside of lock! - if persistHeadChunk { - s.persistQueue <- persistRequest{m.fp, m.series.head()} + if headChunkToPersist != nil { + s.persistQueue <- persistRequest{m.fp, headChunkToPersist} } } @@ -544,7 +544,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime s.fpToSeries.del(fp) s.numSeries.Dec() s.seriesOps.WithLabelValues(memoryPurge).Inc() - s.persistence.unindexMetric(series.metric, fp) + s.persistence.unindexMetric(fp, series.metric) } else if series.chunkDescsOffset != -1 { series.chunkDescsOffset += numPurged - numDropped if series.chunkDescsOffset < 0 { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index ef5136109..51d4b7944 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -22,8 +22,43 @@ import ( clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/metric" + "github.com/prometheus/prometheus/utility/test" ) +func TestGetFingerprintsForLabelMatchers(t *testing.T) { + +} + +// TestLoop is just a smoke test for the loop method, if we can switch it on and +// off without disaster. +func TestLoop(t *testing.T) { + samples := make(clientmodel.Samples, 1000) + for i := range samples { + samples[i] = &clientmodel.Sample{ + Timestamp: clientmodel.Timestamp(2 * i), + Value: clientmodel.SampleValue(float64(i) * 0.2), + } + } + directory := test.NewTemporaryDirectory("test_storage", t) + defer directory.Close() + o := &MemorySeriesStorageOptions{ + MemoryEvictionInterval: 100 * time.Millisecond, + MemoryRetentionPeriod: time.Hour, + PersistencePurgeInterval: 150 * time.Millisecond, + PersistenceRetentionPeriod: 24 * 7 * time.Hour, + PersistenceStoragePath: directory.Path(), + CheckpointInterval: 250 * time.Millisecond, + } + storage, err := NewMemorySeriesStorage(o) + if err != nil { + t.Fatalf("Error creating storage: %s", err) + } + storage.Start() + storage.AppendSamples(samples) + time.Sleep(time.Second) + storage.Stop() +} + func TestChunk(t *testing.T) { samples := make(clientmodel.Samples, 500000) for i := range samples { @@ -272,6 +307,127 @@ func TestGetRangeValues(t *testing.T) { } } +func TestEvictAndPurgeSeries(t *testing.T) { + samples := make(clientmodel.Samples, 1000) + for i := range samples { + samples[i] = &clientmodel.Sample{ + Timestamp: clientmodel.Timestamp(2 * i), + Value: clientmodel.SampleValue(float64(i) * 0.2), + } + } + s, closer := NewTestStorage(t) + defer closer.Close() + + ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method. + + s.AppendSamples(samples) + + fp := clientmodel.Metric{}.Fingerprint() + + // Purge ~half of the chunks. + ms.purgeSeries(fp, 1000) + it := s.NewIterator(fp) + actual := it.GetBoundaryValues(metric.Interval{ + OldestInclusive: 0, + NewestInclusive: 10000, + }) + if len(actual) != 2 { + t.Fatal("expected two results after purging half of series") + } + if actual[0].Timestamp < 800 || actual[0].Timestamp > 1000 { + t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) + } + want := clientmodel.Timestamp(1998) + if actual[1].Timestamp != want { + t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) + } + + // Purge everything. + ms.purgeSeries(fp, 10000) + it = s.NewIterator(fp) + actual = it.GetBoundaryValues(metric.Interval{ + OldestInclusive: 0, + NewestInclusive: 10000, + }) + if len(actual) != 0 { + t.Fatal("expected zero results after purging the whole series") + } + + // Recreate series. + s.AppendSamples(samples) + + series, ok := ms.fpToSeries.get(fp) + if !ok { + t.Fatal("could not find series") + } + + // Evict everything except head chunk. + allEvicted, headChunkToPersist := series.evictOlderThan(1998) + // Head chunk not yet old enough, should get false, false: + if allEvicted { + t.Error("allEvicted with head chunk not yet old enough") + } + if headChunkToPersist != nil { + t.Error("persistHeadChunk is not nil although head chunk is not old enough") + } + // Evict everything. + allEvicted, headChunkToPersist = series.evictOlderThan(10000) + // Since the head chunk is not yet persisted, we should get false, true: + if allEvicted { + t.Error("allEvicted with head chuk not yet persisted") + } + if headChunkToPersist == nil { + t.Error("headChunkToPersist is nil although head chunk is old enough") + } + // Persist head chunk as requested. + ms.persistQueue <- persistRequest{fp, series.head()} + time.Sleep(time.Second) // Give time for persisting to happen. + allEvicted, headChunkToPersist = series.evictOlderThan(10000) + // Now we should really see everything gone. + if !allEvicted { + t.Error("not allEvicted") + } + if headChunkToPersist != nil { + t.Error("headChunkToPersist is not nil although already persisted") + } + + // Now archive as it would usually be done in the evictTicker loop. + ms.fpToSeries.del(fp) + if err := ms.persistence.archiveMetric( + fp, series.metric, series.firstTime(), series.lastTime(), + ); err != nil { + t.Fatal(err) + } + + archived, _, _, err := ms.persistence.hasArchivedMetric(fp) + if err != nil { + t.Fatal(err) + } + if !archived { + t.Fatal("not archived") + } + + // Purge ~half of the chunks of an archived series. + ms.purgeSeries(fp, 1000) + archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + if err != nil { + t.Fatal(err) + } + if !archived { + t.Fatal("archived series dropped although only half of the chunks purged") + } + + // Purge everything. + ms.purgeSeries(fp, 10000) + archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + if err != nil { + t.Fatal(err) + } + if archived { + t.Fatal("archived series not dropped") + } +} + func BenchmarkAppend(b *testing.B) { samples := make(clientmodel.Samples, b.N) for i := range samples { @@ -292,17 +448,22 @@ func BenchmarkAppend(b *testing.B) { s.AppendSamples(samples) } +// Append a large number of random samples and then check if we can get them out +// of the storage alright. func TestFuzz(t *testing.T) { - r := rand.New(rand.NewSource(42)) + if testing.Short() { + t.Skip("Skipping test in short mode.") + } - check := func() bool { + check := func(seed int64) bool { + rand.Seed(seed) s, c := NewTestStorage(t) defer c.Close() - samples := createRandomSamples(r) + samples := createRandomSamples() s.AppendSamples(samples) - return verifyStorage(t, s, samples, r) + return verifyStorage(t, s, samples, 24*7*time.Hour) } if err := quick.Check(check, nil); err != nil { @@ -310,7 +471,55 @@ func TestFuzz(t *testing.T) { } } -func createRandomSamples(r *rand.Rand) clientmodel.Samples { +// BenchmarkFuzz is the benchmark version TestFuzz. However, it will run several +// append and verify operations in parallel, if GOMAXPROC is set +// accordingly. Also, the storage options are set such that evictions, +// checkpoints, and purging will happen concurrently, too. This benchmark will +// have a very long runtime (up to minutes). You can use it as an actual +// benchmark. Run it like this: +// +// go test -cpu 1,2,4,8 -short -bench BenchmarkFuzz -benchmem +// +// You can also use it as a test for races. In that case, run it like this (will +// make things even slower): +// +// go test -race -cpu 8 -short -bench BenchmarkFuzz +func BenchmarkFuzz(b *testing.B) { + b.StopTimer() + rand.Seed(42) + directory := test.NewTemporaryDirectory("test_storage", b) + defer directory.Close() + o := &MemorySeriesStorageOptions{ + MemoryEvictionInterval: time.Second, + MemoryRetentionPeriod: 10 * time.Minute, + PersistencePurgeInterval: 10 * time.Second, + PersistenceRetentionPeriod: time.Hour, + PersistenceStoragePath: directory.Path(), + CheckpointInterval: 3 * time.Second, + } + s, err := NewMemorySeriesStorage(o) + if err != nil { + b.Fatalf("Error creating storage: %s", err) + } + s.Start() + defer s.Stop() + b.StartTimer() + + b.RunParallel(func(pb *testing.PB) { + var allSamples clientmodel.Samples + for pb.Next() { + newSamples := createRandomSamples() + allSamples = append(allSamples, newSamples[:len(newSamples)/2]...) + s.AppendSamples(newSamples[:len(newSamples)/2]) + verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod) + allSamples = append(allSamples, newSamples[len(newSamples)/2:]...) + s.AppendSamples(newSamples[len(newSamples)/2:]) + verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod) + } + }) +} + +func createRandomSamples() clientmodel.Samples { type valueCreator func() clientmodel.SampleValue type deltaApplier func(clientmodel.SampleValue) clientmodel.SampleValue @@ -318,49 +527,49 @@ func createRandomSamples(r *rand.Rand) clientmodel.Samples { maxMetrics = 5 maxCycles = 500 maxStreakLength = 500 - timestamp = time.Now().Unix() maxTimeDelta = 1000 maxTimeDeltaFactor = 10 + timestamp = clientmodel.Now() - clientmodel.Timestamp(maxTimeDelta*maxTimeDeltaFactor*maxCycles*maxStreakLength/16) // So that some timestamps are in the future. generators = []struct { createValue valueCreator applyDelta []deltaApplier }{ { // "Boolean". createValue: func() clientmodel.SampleValue { - return clientmodel.SampleValue(r.Intn(2)) + return clientmodel.SampleValue(rand.Intn(2)) }, applyDelta: []deltaApplier{ func(_ clientmodel.SampleValue) clientmodel.SampleValue { - return clientmodel.SampleValue(r.Intn(2)) + return clientmodel.SampleValue(rand.Intn(2)) }, }, }, { // Integer with int deltas of various byte length. createValue: func() clientmodel.SampleValue { - return clientmodel.SampleValue(r.Int63() - 1<<62) + return clientmodel.SampleValue(rand.Int63() - 1<<62) }, applyDelta: []deltaApplier{ func(v clientmodel.SampleValue) clientmodel.SampleValue { - return clientmodel.SampleValue(r.Intn(1<<8) - 1<<7 + int(v)) + return clientmodel.SampleValue(rand.Intn(1<<8) - 1<<7 + int(v)) }, func(v clientmodel.SampleValue) clientmodel.SampleValue { - return clientmodel.SampleValue(r.Intn(1<<16) - 1<<15 + int(v)) + return clientmodel.SampleValue(rand.Intn(1<<16) - 1<<15 + int(v)) }, func(v clientmodel.SampleValue) clientmodel.SampleValue { - return clientmodel.SampleValue(r.Intn(1<<32) - 1<<31 + int(v)) + return clientmodel.SampleValue(rand.Intn(1<<32) - 1<<31 + int(v)) }, }, }, { // Float with float32 and float64 deltas. createValue: func() clientmodel.SampleValue { - return clientmodel.SampleValue(r.NormFloat64()) + return clientmodel.SampleValue(rand.NormFloat64()) }, applyDelta: []deltaApplier{ func(v clientmodel.SampleValue) clientmodel.SampleValue { - return v + clientmodel.SampleValue(float32(r.NormFloat64())) + return v + clientmodel.SampleValue(float32(rand.NormFloat64())) }, func(v clientmodel.SampleValue) clientmodel.SampleValue { - return v + clientmodel.SampleValue(r.NormFloat64()) + return v + clientmodel.SampleValue(rand.NormFloat64()) }, }, }, @@ -370,55 +579,55 @@ func createRandomSamples(r *rand.Rand) clientmodel.Samples { result := clientmodel.Samples{} metrics := []clientmodel.Metric{} - for n := r.Intn(maxMetrics); n >= 0; n-- { + for n := rand.Intn(maxMetrics); n >= 0; n-- { metrics = append(metrics, clientmodel.Metric{ - clientmodel.LabelName(fmt.Sprintf("labelname_%d", n+1)): clientmodel.LabelValue(fmt.Sprintf("labelvalue_%d", n+1)), + clientmodel.LabelName(fmt.Sprintf("labelname_%d", n+1)): clientmodel.LabelValue(fmt.Sprintf("labelvalue_%d", rand.Int())), }) } - for n := r.Intn(maxCycles); n >= 0; n-- { + for n := rand.Intn(maxCycles); n >= 0; n-- { // Pick a metric for this cycle. - metric := metrics[r.Intn(len(metrics))] - timeDelta := r.Intn(maxTimeDelta) + 1 - generator := generators[r.Intn(len(generators))] + metric := metrics[rand.Intn(len(metrics))] + timeDelta := rand.Intn(maxTimeDelta) + 1 + generator := generators[rand.Intn(len(generators))] createValue := generator.createValue - applyDelta := generator.applyDelta[r.Intn(len(generator.applyDelta))] - incTimestamp := func() { timestamp += int64(timeDelta * (r.Intn(maxTimeDeltaFactor) + 1)) } - switch r.Intn(4) { + applyDelta := generator.applyDelta[rand.Intn(len(generator.applyDelta))] + incTimestamp := func() { timestamp += clientmodel.Timestamp(timeDelta * (rand.Intn(maxTimeDeltaFactor) + 1)) } + switch rand.Intn(4) { case 0: // A single sample. result = append(result, &clientmodel.Sample{ Metric: metric, Value: createValue(), - Timestamp: clientmodel.TimestampFromUnix(timestamp), + Timestamp: timestamp, }) incTimestamp() case 1: // A streak of random sample values. - for n := r.Intn(maxStreakLength); n >= 0; n-- { + for n := rand.Intn(maxStreakLength); n >= 0; n-- { result = append(result, &clientmodel.Sample{ Metric: metric, Value: createValue(), - Timestamp: clientmodel.TimestampFromUnix(timestamp), + Timestamp: timestamp, }) incTimestamp() } case 2: // A streak of sample values with incremental changes. value := createValue() - for n := r.Intn(maxStreakLength); n >= 0; n-- { + for n := rand.Intn(maxStreakLength); n >= 0; n-- { result = append(result, &clientmodel.Sample{ Metric: metric, Value: value, - Timestamp: clientmodel.TimestampFromUnix(timestamp), + Timestamp: timestamp, }) incTimestamp() value = applyDelta(value) } case 3: // A streak of constant sample values. value := createValue() - for n := r.Intn(maxStreakLength); n >= 0; n-- { + for n := rand.Intn(maxStreakLength); n >= 0; n-- { result = append(result, &clientmodel.Sample{ Metric: metric, Value: value, - Timestamp: clientmodel.TimestampFromUnix(timestamp), + Timestamp: timestamp, }) incTimestamp() } @@ -428,28 +637,36 @@ func createRandomSamples(r *rand.Rand) clientmodel.Samples { return result } -func verifyStorage(t *testing.T, s Storage, samples clientmodel.Samples, r *rand.Rand) bool { - iters := map[clientmodel.Fingerprint]SeriesIterator{} +func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool { result := true - for _, i := range r.Perm(len(samples)) { + for _, i := range rand.Perm(len(samples)) { sample := samples[i] - fp := sample.Metric.Fingerprint() - iter, ok := iters[fp] - if !ok { - iter = s.NewIterator(fp) - iters[fp] = iter + if sample.Timestamp.Before(clientmodel.TimestampFromTime(time.Now().Add(-maxAge))) { + continue + // TODO: Once we have a guaranteed cutoff at the + // retention period, we can verify here that no results + // are returned. } - found := iter.GetValueAtTime(sample.Timestamp) + fp := sample.Metric.Fingerprint() + p := s.NewPreloader() + p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) + found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp) if len(found) != 1 { - t.Errorf("Expected exactly one value, found %d.", len(found)) - return false + t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found)) + result = false + p.Close() + continue } want := float64(sample.Value) got := float64(found[0].Value) - if want != got { - t.Errorf("Value mismatch, want %f, got %f.", want, got) + if want != got || sample.Timestamp != found[0].Timestamp { + t.Errorf( + "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", + want, sample.Timestamp, got, found[0].Timestamp, + ) result = false } + p.Close() } return result }