diff --git a/storage/metric/compaction_regression_test.go b/storage/metric/compaction_regression_test.go new file mode 100644 index 000000000..4a2fe93ae --- /dev/null +++ b/storage/metric/compaction_regression_test.go @@ -0,0 +1,244 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "fmt" + "testing" + "flag" + "time" + + "github.com/prometheus/prometheus/storage" + + clientmodel "github.com/prometheus/client_golang/model" +) + +type nopCurationStateUpdater struct{} +func (n *nopCurationStateUpdater) UpdateCurationState(*CurationState) {} + +func generateTestSamples(endTime time.Time, numTs int, samplesPerTs int, interval time.Duration) clientmodel.Samples { + samples := clientmodel.Samples{} + + startTime := endTime.Add(-interval * time.Duration(samplesPerTs-1)) + for ts := 0; ts < numTs; ts++ { + metric := clientmodel.Metric{} + metric["name"] = clientmodel.LabelValue(fmt.Sprintf("metric_%d", ts)) + for i := 0; i < samplesPerTs; i++ { + sample := &clientmodel.Sample{ + Metric: metric, + Value: clientmodel.SampleValue(ts + 1000 * i), + Timestamp: startTime.Add(interval * time.Duration(i)), + } + samples = append(samples, sample) + } + } + return samples +} + +type compactionChecker struct { + t *testing.T + sampleIdx int + numChunks int + expectedSamples clientmodel.Samples +} + +func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorError { + c.numChunks++ + sampleKey := key.(*SampleKey) + if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { + c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) + } + for _, sample := range value.(Values) { + if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) { + c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) + } + + expected := c.expectedSamples[c.sampleIdx] + + fp := &clientmodel.Fingerprint{} + fp.LoadFromMetric(expected.Metric) + if !sampleKey.Fingerprint.Equal(fp) { + c.t.Fatalf("%d. Expected fingerprint %s, got %s", c.sampleIdx, fp, sampleKey.Fingerprint) + } + + sp := &SamplePair{ + Value: expected.Value, + Timestamp: expected.Timestamp, + } + if !sample.Equal(sp) { + c.t.Fatalf("%d. Expected sample %s, got %s", c.sampleIdx, sp, sample) + } + c.sampleIdx++ + } + return nil +} + + +func checkStorageSaneAndEquivalent(t *testing.T, name string, ts *TieredStorage, samples clientmodel.Samples, expectedNumChunks int) { + cc := &compactionChecker{ + expectedSamples: samples, + t: t, + } + entire, err := ts.DiskStorage.MetricSamples.ForEach(&MetricSamplesDecoder{}, &AcceptAllFilter{}, cc) + if err != nil { + t.Fatalf("%s: Error dumping samples: %s", name, err) + } + if !entire { + t.Fatalf("%s: Didn't scan entire corpus", name) + } + if cc.numChunks != expectedNumChunks { + t.Fatalf("%s: Expected %d chunks, got %d", name, expectedNumChunks, cc.numChunks) + } +} + +type compactionTestScenario struct { + leveldbChunkSize int + numTimeseries int + samplesPerTs int + + ignoreYoungerThan time.Duration + maximumMutationPoolBatch int + minimumGroupSize int + + uncompactedChunks int + compactedChunks int +} + +func (s compactionTestScenario) run(t *testing.T) { + flag.Set("leveldbChunkSize", fmt.Sprintf("%d", s.leveldbChunkSize)) + defer flag.Set("leveldbChunkSize", "200") + + ts, closer := NewTestTieredStorage(t) + defer closer.Close() + + // 1. Store test values. + samples := generateTestSamples(testInstant, s.numTimeseries, s.samplesPerTs, time.Minute) + ts.AppendSamples(samples) + ts.Flush() + + // 2. Check sanity of uncompacted values. + checkStorageSaneAndEquivalent(t, "Before compaction", ts, samples, s.uncompactedChunks) + + // 3. Compact test storage. + processor := NewCompactionProcessor(&CompactionProcessorOptions{ + MaximumMutationPoolBatch: s.maximumMutationPoolBatch, + MinimumGroupSize: s.minimumGroupSize, + }) + defer processor.Close() + + curator := NewCurator(&CuratorOptions{ + Stop: make(chan bool), + ViewQueue: ts.ViewQueue, + }) + defer curator.Close() + + fmt.Println("test instant:", testInstant) + err := curator.Run(s.ignoreYoungerThan, testInstant, processor, ts.DiskStorage.CurationRemarks, ts.DiskStorage.MetricSamples, ts.DiskStorage.MetricHighWatermarks, &nopCurationStateUpdater{}) + if err != nil { + t.Fatalf("Failed to run curator: %s", err) + } + + // 4. Check sanity of compacted values. + checkStorageSaneAndEquivalent(t, "After compaction", ts, samples, s.compactedChunks) +} + +func TestCompaction(t *testing.T) { + scenarios := []compactionTestScenario{ + // BEFORE COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 5 | A | 1 .. 5 + // 5 | A | 6 .. 10 + // 5 | A | 11 .. 15 + // 5 | B | 1 .. 5 + // 5 | B | 6 .. 10 + // 5 | B | 11 .. 15 + // 5 | C | 1 .. 5 + // 5 | C | 6 .. 10 + // 5 | C | 11 .. 15 + // + // AFTER COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 10 | A | 1 .. 10 + // 5 | A | 11 .. 15 + // 10 | B | 1 .. 10 + // 5 | B | 11 .. 15 + // 10 | C | 1 .. 10 + // 5 | C | 11 .. 15 + { + leveldbChunkSize: 5, + numTimeseries: 3, + samplesPerTs: 15, + + ignoreYoungerThan: time.Minute, + maximumMutationPoolBatch: 30, + minimumGroupSize: 10, + + uncompactedChunks: 9, + compactedChunks: 6, + }, + // BEFORE COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 5 | A | 1 .. 5 + // 5 | A | 6 .. 10 + // 5 | A | 11 .. 15 + // 5 | B | 1 .. 5 + // 5 | B | 6 .. 10 + // 5 | B | 11 .. 15 + // 5 | C | 1 .. 5 + // 5 | C | 6 .. 10 + // 5 | C | 11 .. 15 + // + // AFTER COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 10 | A | 1 .. 15 + // 10 | B | 1 .. 15 + // 10 | C | 1 .. 15 + { + leveldbChunkSize: 5, + numTimeseries: 3, + samplesPerTs: 15, + + ignoreYoungerThan: time.Minute, + maximumMutationPoolBatch: 30, + minimumGroupSize: 30, + + uncompactedChunks: 9, + compactedChunks: 3, + }, + // BUG: This case crashes! See: + // https://github.com/prometheus/prometheus/issues/368 + // Fix this! + // + //{ + // leveldbChunkSize: 5, + // numTimeseries: 3, + // samplesPerTs: 20, + + // ignoreYoungerThan: time.Minute, + // maximumMutationPoolBatch: 30, + // minimumGroupSize: 10, + + // uncompactedChunks: 12, + // compactedChunks: 9, + //}, + } + + for _, s := range scenarios { + s.run(t) + } +}