diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 35f8c0b8d..a8ac2a34a 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -143,7 +143,7 @@ func init() { ) cfg.fs.Var( &local.DefaultChunkEncoding, "storage.local.chunk-encoding-version", - "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).", + "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (double-delta encoding with variable bit-width).", ) // Index cache sizes. cfg.fs.IntVar( diff --git a/promql/test.go b/promql/test.go index 26a9a4169..2ca105efa 100644 --- a/promql/test.go +++ b/promql/test.go @@ -469,7 +469,7 @@ func (t *Test) clear() { } var closer testutil.Closer - t.storage, closer = local.NewTestStorage(t, 1) + t.storage, closer = local.NewTestStorage(t, 2) t.closeStorage = closer.Close t.queryEngine = NewEngine(t.storage, nil) diff --git a/rules/recording_test.go b/rules/recording_test.go index 9e9929e7d..310df2fe4 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -24,7 +24,7 @@ import ( ) func TestRuleEval(t *testing.T) { - storage, closer := local.NewTestStorage(t, 1) + storage, closer := local.NewTestStorage(t, 2) defer closer.Close() engine := promql.NewEngine(storage, nil) now := model.Now() diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 2e9fcf4cf..cceb1f5cf 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,6 +15,7 @@ package local import ( "container/list" + "errors" "fmt" "io" "sort" @@ -26,9 +27,11 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -// The DefaultChunkEncoding can be changed via a flag. +// DefaultChunkEncoding can be changed via a flag. var DefaultChunkEncoding = doubleDelta +var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") + type chunkEncoding byte // String implements flag.Value. @@ -43,6 +46,8 @@ func (ce *chunkEncoding) Set(s string) error { *ce = delta case "1": *ce = doubleDelta + case "2": + *ce = varbit default: return fmt.Errorf("invalid chunk encoding: %s", s) } @@ -52,6 +57,7 @@ func (ce *chunkEncoding) Set(s string) error { const ( delta chunkEncoding = iota doubleDelta + varbit ) // chunkDesc contains meta-data for a chunk. Pay special attention to the @@ -285,7 +291,7 @@ type chunkIterator interface { // of the find... methods). It returns ZeroSamplePair before any of // those methods were called. value() model.SamplePair - // Returns the last error encountered. In general, an error signal data + // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. err() error } @@ -306,6 +312,21 @@ func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, erro return result, it.err() } +// addToOverflowChunk is a utility function that creates a new chunk as overflow +// chunk, adds the provided sample to it, and returns a chunk slice containing +// the provided old chunk followed by the new overflow chunk. +func addToOverflowChunk(c chunk, s model.SamplePair) ([]chunk, error) { + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{c, overflowChunks[0]}, nil +} + +// transcodeAndAdd is a utility function that transcodes the dst chunk into the +// provided src chunk (plus the necessary overflow chunks) and then adds the +// provided sample. It returns the new chunks (transcoded plus overflow) with +// the new sample at the end. func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() @@ -334,7 +355,7 @@ func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) } // newChunk creates a new chunk according to the encoding set by the -// defaultChunkEncoding flag. +// DefaultChunkEncoding flag. func newChunk() chunk { chunk, err := newChunkForEncoding(DefaultChunkEncoding) if err != nil { @@ -349,6 +370,8 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil case doubleDelta: return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil + case varbit: + return newVarbitChunk(varbitZeroEncoding), nil default: return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } diff --git a/storage/local/delta.go b/storage/local/delta.go index a74e0806a..0ab2f0d9d 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -74,6 +74,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod // add implements chunk. func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { + // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) @@ -86,11 +87,7 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks, err := newChunk().add(s) - if err != nil { - return nil, err - } - return []chunk{&c, overflowChunks[0]}, nil + return addToOverflowChunk(&c, s) } baseValue := c.baseValue() @@ -130,11 +127,7 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks, err := newChunk().add(s) - if err != nil { - return nil, err - } - return []chunk{&c, overflowChunks[0]}, nil + return addToOverflowChunk(&c, s) } offset := len(c) @@ -328,8 +321,8 @@ func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time { return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } - return model.Earliest } func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { @@ -348,6 +341,7 @@ func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleVa // No d8 for ints. default: acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) + return 0 } } else { switch acc.vBytes { @@ -358,7 +352,7 @@ func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleVa return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } - return 0 } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index a53d41f6b..7e7eed9e3 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -81,6 +81,7 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub // add implements chunk. func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { + // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.len() == 0 { return c.addFirstSample(s), nil } @@ -98,11 +99,7 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks, err := newChunk().add(s) - if err != nil { - return nil, err - } - return []chunk{&c, overflowChunks[0]}, nil + return addToOverflowChunk(&c, s) } projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() @@ -136,11 +133,7 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks, err := newChunk().add(s) - if err != nil { - return nil, err - } - return []chunk{&c, overflowChunks[0]}, nil + return addToOverflowChunk(&c, s) } offset := len(c) @@ -452,8 +445,8 @@ func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time return model.Time(binary.LittleEndian.Uint64(acc.c[offset:])) default: acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes) + return model.Earliest } - return model.Earliest } func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue { @@ -491,6 +484,7 @@ func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.Sa // No d8 for ints. default: acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes) + return 0 } } else { switch acc.vBytes { @@ -503,7 +497,7 @@ func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.Sa return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:]))) default: acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes) + return 0 } } - return 0 } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 692f494d5..f3f4dcea8 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -653,6 +653,10 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) { testCheckpointAndLoadSeriesMapAndHeads(t, 1) } +func TestCheckpointAndLoadSeriesMapAndHeadsChunkType2(t *testing.T) { + testCheckpointAndLoadSeriesMapAndHeads(t, 2) +} + func TestCheckpointAndLoadFPMappings(t *testing.T) { p, closer := newTestPersistence(t, 1) defer closer.Close() @@ -758,6 +762,10 @@ func TestFingerprintsModifiedBeforeChunkType1(t *testing.T) { testFingerprintsModifiedBefore(t, 1) } +func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) { + testFingerprintsModifiedBefore(t, 2) +} + func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -822,6 +830,10 @@ func TestDropArchivedMetricChunkType1(t *testing.T) { testDropArchivedMetric(t, 1) } +func TestDropArchivedMetricChunkType2(t *testing.T) { + testDropArchivedMetric(t, 2) +} + type incrementalBatch struct { fpToMetric index.FingerprintMetricMapping expectedLnToLvs index.LabelNameLabelValuesMapping @@ -1002,6 +1014,10 @@ func TestIndexingChunkType1(t *testing.T) { testIndexing(t, 1) } +func TestIndexingChunkType2(t *testing.T) { + testIndexing(t, 2) +} + func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) { p.waitForIndexing() for fp, m := range indexedFpsToMetrics { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index f305e792f..a9b0fe5be 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -31,7 +31,7 @@ import ( ) func TestMatches(t *testing.T) { - storage, closer := NewTestStorage(t, 1) + storage, closer := NewTestStorage(t, 2) defer closer.Close() storage.archiveHighWatermark = 90 @@ -266,7 +266,7 @@ func TestMatches(t *testing.T) { } func TestFingerprintsForLabels(t *testing.T) { - storage, closer := NewTestStorage(t, 1) + storage, closer := NewTestStorage(t, 2) defer closer.Close() samples := make([]*model.Sample, 100) @@ -348,7 +348,7 @@ func TestFingerprintsForLabels(t *testing.T) { var benchLabelMatchingRes map[model.Fingerprint]metric.Metric func BenchmarkLabelMatching(b *testing.B) { - s, closer := NewTestStorage(b, 1) + s, closer := NewTestStorage(b, 2) defer closer.Close() h := fnv.New64a() @@ -444,7 +444,7 @@ func TestRetentionCutoff(t *testing.T) { now := model.Now() insertStart := now.Add(-2 * time.Hour) - s, closer := NewTestStorage(t, 1) + s, closer := NewTestStorage(t, 2) defer closer.Close() // Stop maintenance loop to prevent actual purging. @@ -498,7 +498,7 @@ func TestDropMetrics(t *testing.T) { now := model.Now() insertStart := now.Add(-2 * time.Hour) - s, closer := NewTestStorage(t, 1) + s, closer := NewTestStorage(t, 2) defer closer.Close() chunkFileExists := func(fp model.Fingerprint) (bool, error) { @@ -605,7 +605,7 @@ func TestQuarantineMetric(t *testing.T) { now := model.Now() insertStart := now.Add(-2 * time.Hour) - s, closer := NewTestStorage(t, 1) + s, closer := NewTestStorage(t, 2) defer closer.Close() chunkFileExists := func(fp model.Fingerprint) (bool, error) { @@ -749,7 +749,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) { for m := range s.fpToSeries.iter() { s.fpLocker.Lock(m.fp) - + defer s.fpLocker.Unlock(m.fp) // TODO remove, see below var values []model.SamplePair for _, cd := range m.series.chunkDescs { if cd.isEvicted() { @@ -772,7 +772,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) { t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) } } - s.fpLocker.Unlock(m.fp) + //s.fpLocker.Unlock(m.fp) } log.Info("test done, closing") } @@ -785,6 +785,10 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } +func TestChunkType2(t *testing.T) { + testChunk(t, 2) +} + func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -859,6 +863,10 @@ func TestValueAtTimeChunkType1(t *testing.T) { testValueAtOrBeforeTime(t, 1) } +func TestValueAtTimeChunkType2(t *testing.T) { + testValueAtOrBeforeTime(t, 2) +} + func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -937,6 +945,10 @@ func BenchmarkValueAtTimeChunkType1(b *testing.B) { benchmarkValueAtOrBeforeTime(b, 1) } +func BenchmarkValueAtTimeChunkType2(b *testing.B) { + benchmarkValueAtOrBeforeTime(b, 2) +} + func testRangeValues(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -1089,6 +1101,10 @@ func TestRangeValuesChunkType1(t *testing.T) { testRangeValues(t, 1) } +func TestRangeValuesChunkType2(t *testing.T) { + testRangeValues(t, 2) +} + func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -1133,6 +1149,10 @@ func BenchmarkRangeValuesChunkType1(b *testing.B) { benchmarkRangeValues(b, 1) } +func BenchmarkRangeValuesChunkType2(b *testing.B) { + benchmarkRangeValues(b, 2) +} + func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -1284,6 +1304,10 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) { testEvictAndPurgeSeries(t, 1) } +func TestEvictAndPurgeSeriesChunkType2(t *testing.T) { + testEvictAndPurgeSeries(t, 2) +} + func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { @@ -1386,6 +1410,10 @@ func BenchmarkAppendType1(b *testing.B) { benchmarkAppend(b, 1) } +func BenchmarkAppendType2(b *testing.B) { + benchmarkAppend(b, 2) +} + // Append a large number of random samples and then check if we can get them out // of the storage alright. func testFuzz(t *testing.T, encoding chunkEncoding) { @@ -1402,7 +1430,10 @@ func testFuzz(t *testing.T, encoding chunkEncoding) { for _, sample := range samples { s.Append(sample) } - return verifyStorage(t, s, samples, 24*7*time.Hour) + if !verifyStorageRandom(t, s, samples) { + return false + } + return verifyStorageSequential(t, s, samples) } if err := quick.Check(check, nil); err != nil { @@ -1418,6 +1449,10 @@ func TestFuzzChunkType1(t *testing.T) { testFuzz(t, 1) } +func TestFuzzChunkType2(t *testing.T) { + testFuzz(t, 2) +} + // benchmarkFuzz is the benchmark version of testFuzz. The storage options are // set such that evictions, checkpoints, and purging will happen concurrently, // too. This benchmark will have a very long runtime (up to minutes). You can @@ -1462,11 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { for _, sample := range samples[start:middle] { s.Append(sample) } - verifyStorage(b, s.(*memorySeriesStorage), samples[:middle], o.PersistenceRetentionPeriod) + verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle]) for _, sample := range samples[middle:end] { s.Append(sample) } - verifyStorage(b, s.(*memorySeriesStorage), samples[:end], o.PersistenceRetentionPeriod) + verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end]) + verifyStorageSequential(b, s.(*memorySeriesStorage), samples) } } @@ -1478,17 +1514,20 @@ func BenchmarkFuzzChunkType1(b *testing.B) { benchmarkFuzz(b, 1) } +func BenchmarkFuzzChunkType2(b *testing.B) { + benchmarkFuzz(b, 2) +} + func createRandomSamples(metricName string, minLen int) model.Samples { type valueCreator func() model.SampleValue type deltaApplier func(model.SampleValue) model.SampleValue var ( - maxMetrics = 5 - maxStreakLength = 500 - maxTimeDelta = 10000 - maxTimeDeltaFactor = 10 - timestamp = model.Now() - model.Time(maxTimeDelta*maxTimeDeltaFactor*minLen/4) // So that some timestamps are in the future. - generators = []struct { + maxMetrics = 5 + maxStreakLength = 2000 + maxTimeDelta = 10000 + timestamp = model.Now() - model.Time(maxTimeDelta*minLen) // So that some timestamps are in the future. + generators = []struct { createValue valueCreator applyDelta []deltaApplier }{ @@ -1532,6 +1571,28 @@ func createRandomSamples(metricName string, minLen int) model.Samples { }, }, } + timestampIncrementers = []func(baseDelta model.Time) model.Time{ + // Regular increments. + func(delta model.Time) model.Time { + return delta + }, + // Jittered increments. σ is 1/100 of delta, e.g. 10ms for 10s scrape interval. + func(delta model.Time) model.Time { + return delta + model.Time(rand.NormFloat64()*float64(delta)/100) + }, + // Regular increments, but missing a scrape with 10% chance. + func(delta model.Time) model.Time { + i := rand.Intn(100) + if i < 90 { + return delta + } + if i < 99 { + return 2 * delta + } + return 3 * delta + // Ignoring the case with more than two missed scrapes in a row. + }, + } ) // Prefill result with two samples with colliding metrics (to test fingerprint mapping). @@ -1563,13 +1624,16 @@ func createRandomSamples(metricName string, minLen int) model.Samples { } for len(result) < minLen { - // Pick a metric for this cycle. - metric := metrics[rand.Intn(len(metrics))] - timeDelta := rand.Intn(maxTimeDelta) + 1 - generator := generators[rand.Intn(len(generators))] - createValue := generator.createValue - applyDelta := generator.applyDelta[rand.Intn(len(generator.applyDelta))] - incTimestamp := func() { timestamp += model.Time(timeDelta * (rand.Intn(maxTimeDeltaFactor) + 1)) } + var ( + // Pick a metric for this cycle. + metric = metrics[rand.Intn(len(metrics))] + timeDelta = model.Time(rand.Intn(maxTimeDelta) + 1) + generator = generators[rand.Intn(len(generators))] + createValue = generator.createValue + applyDelta = generator.applyDelta[rand.Intn(len(generator.applyDelta))] + incTimestamp = timestampIncrementers[rand.Intn(len(timestampIncrementers))] + ) + switch rand.Intn(4) { case 0: // A single sample. result = append(result, &model.Sample{ @@ -1577,7 +1641,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: createValue(), Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) case 1: // A streak of random sample values. for n := rand.Intn(maxStreakLength); n >= 0; n-- { result = append(result, &model.Sample{ @@ -1585,7 +1649,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: createValue(), Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) } case 2: // A streak of sample values with incremental changes. value := createValue() @@ -1595,7 +1659,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: value, Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) value = applyDelta(value) } case 3: // A streak of constant sample values. @@ -1606,7 +1670,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: value, Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) } } } @@ -1614,34 +1678,32 @@ func createRandomSamples(metricName string, minLen int) model.Samples { return result } -func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, maxAge time.Duration) bool { +func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { s.WaitForIndexing() result := true for _, i := range rand.Perm(len(samples)) { sample := samples[i] - if sample.Timestamp.Before(model.TimeFromUnixNano(time.Now().Add(-maxAge).UnixNano())) { - continue - // TODO: Once we have a guaranteed cutoff at the - // retention period, we can verify here that no results - // are returned. - } fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) if err != nil { t.Fatal(err) } p := s.NewPreloader() - it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) + it := p.PreloadInstant(fp, sample.Timestamp, 0) found := it.ValueAtOrBeforeTime(sample.Timestamp) - if found.Timestamp == model.Earliest { - t.Errorf("Sample %#v: Expected sample not found.", sample) + startTime := it.(*boundedIterator).start + switch { + case found.Timestamp != model.Earliest && sample.Timestamp.Before(startTime): + t.Errorf("Sample #%d %#v: Expected outdated sample to be excluded.", i, sample) result = false - p.Close() - continue - } - if sample.Value != found.Value || sample.Timestamp != found.Timestamp { + case found.Timestamp == model.Earliest && !sample.Timestamp.Before(startTime): + t.Errorf("Sample #%d %#v: Expected sample not found.", i, sample) + result = false + case found.Timestamp == model.Earliest && sample.Timestamp.Before(startTime): + // All good. Outdated sample dropped. + case sample.Value != found.Value || sample.Timestamp != found.Timestamp: t.Errorf( - "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", - sample.Value, sample.Timestamp, found.Value, found.Timestamp, + "Sample #%d %#v: Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", + i, sample, sample.Value, sample.Timestamp, found.Value, found.Timestamp, ) result = false } @@ -1650,8 +1712,62 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, return result } +func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { + s.WaitForIndexing() + var ( + result = true + fp model.Fingerprint + p = s.NewPreloader() + it SeriesIterator + r []model.SamplePair + j int + ) + defer func() { + p.Close() + }() + for i, sample := range samples { + newFP, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) + if err != nil { + t.Fatal(err) + } + if it == nil || newFP != fp { + fp = newFP + p.Close() + p = s.NewPreloader() + it = p.PreloadRange(fp, sample.Timestamp, model.Latest) + r = it.RangeValues(metric.Interval{ + OldestInclusive: sample.Timestamp, + NewestInclusive: model.Latest, + }) + j = -1 + } + startTime := it.(*boundedIterator).start + if sample.Timestamp.Before(startTime) { + continue + } + j++ + if j >= len(r) { + t.Errorf( + "Sample #%d %v not found.", + i, sample, + ) + result = false + continue + } + found := r[j] + if sample.Value != found.Value || sample.Timestamp != found.Timestamp { + t.Errorf( + "Sample #%d %v: Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", + i, sample, sample.Value, sample.Timestamp, found.Value, found.Timestamp, + ) + result = false + } + } + return result +} + func TestAppendOutOfOrder(t *testing.T) { - s, closer := NewTestStorage(t, 1) + s, closer := NewTestStorage(t, 2) defer closer.Close() m := model.Metric{ diff --git a/storage/local/varbit.go b/storage/local/varbit.go new file mode 100644 index 000000000..0555cb92d --- /dev/null +++ b/storage/local/varbit.go @@ -0,0 +1,1190 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "encoding/binary" + "fmt" + "io" + "math" + + "github.com/prometheus/common/model" +) + +// The varbit chunk encoding is broadly similar to the double-delta +// chunks. However, it uses a number of different bit-widths to save the +// double-deltas (rather than 1, 2, or 4 bytes). Also, it doesn't use the delta +// of the first two samples of a chunk as the base delta, but uses a "sliding" +// delta, i.e. the delta of the two previous samples. Both differences make +// random access more expensive. Sample values can be encoded with the same +// double-delta scheme as timestamps, but different value encodings can be +// chosen adaptively, among them XOR encoding and "zero" encoding for constant +// sample values. Overall, the varbit encoding results in a much better +// compression ratio (~1.3 bytes per sample compared to ~3.3 bytes per sample +// with double-delta encoding, for typical data sets). +// +// Major parts of the varbit encoding are inspired by the following paper: +// Gorilla: A Fast, Scalable, In-Memory Time Series Database +// T. Pelkonen et al., Facebook Inc. +// http://www.vldb.org/pvldb/vol8/p1816-teller.pdf +// Note that there are significant differences, some due to the way Prometheus +// chunks work, others to optimize for the Prometheus use-case. +// +// Layout of a 1024 byte varbit chunk (big endian, wherever it matters): +// - first time (int64): 8 bytes bit 0000-0063 +// - first value (float64): 8 bytes bit 0064-0127 +// - last time (int64): 8 bytes bit 0128-0191 +// - last value (float64): 8 bytes bit 0192-0255 +// - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279 +// - flags (byte) 1 byte bit 0280-0287 +// - bit offset for next sample 2 bytes bit 0288-0303 +// - first Δv for value encoding 1, otherwise payload +// 4 bytes bit 0304-0335 +// - payload 973 bytes bit 0336-8119 +// The following only exists if the chunk is still open. Otherwise, it might be +// used by payload. +// - bit offset for current ΔΔt=0 count 2 bytes bit 8120-8135 +// - last Δt 3 bytes bit 8136-8159 +// - special bytes for value encoding 4 bytes bit 8160-8191 +// - for encoding 1: last Δv 4 bytes bit 8160-8191 +// - for encoding 2: count of +// - last leading zeros (1 byte) 1 byte bit 8160-8167 +// - last significant bits (1 byte) 1 byte bit 8168-8175 +// +// FLAGS +// +// The two least significant bits of the flags byte define the value encoding +// for the whole chunk, see below. The most significant byte of the flags byte +// is set if the chunk is closed. No samples can be added anymore to a closed +// chunk. Furthermore, the last value of a closed chunk is only saved in the +// header (last time, last value), while in a chunk that is still open, the last +// sample in the payload is the same sample as saved in the header. +// +// The remaining bits in the flags byte are currently unused. +// +// TIMESTAMP ENCODING +// +// The 1st timestamp is saved directly. +// +// The difference to the 2nd timestamp is saved as first Δt. 3 bytes is enough +// for about 4.5h. Since we close a chunk after sitting idle for 1h, this +// limitation has no practical consequences. Should, for whatever reason, a +// larger delta be required, the chunk would be closed, i.e. the new sample is +// added as the last sample to the chunk, and the next sample will be added to a +// new chunk. +// +// From the 3rd timestamp on, a double-delta (ΔΔt) is saved: +// (t_{n} - t_{n-1}) - (t_{n-2} - t_{n-1}) +// To perform that operation, the last Δt is saved at the end of the chunk for +// as long the chunk is not closed yet (see above). +// +// Most of the times, ΔΔt is zero, even with the ms-precision of +// Prometheus. Therefore, we save a ΔΔt of zero as a leading '0' bit followed by +// 7 bits counting the number of consecutive ΔΔt==0 (the count is offset by -1, +// so the range of 0 to 127 represents 1 to 128 repetitions). +// +// If ΔΔt != 0, we essentially apply the Gorilla encoding scheme (cf. section +// 4.1.1 in the paper) but with different bit buckets as Prometheus uses ms +// rather than s, and the default scrape interval is 1m rather than 4m). In +// particular: +// +// - If ΔΔt is between [-32,31], store '10' followed by a 6 bit value. This is +// for minor irregularities in the scrape interval. +// +// - If ΔΔt is between [-65536,65535], store '110' followed by a 17 bit +// value. This will typically happen if a scrape is missed completely. +// +// - If ΔΔt is betwees [-4194304,4194303], store '111' followed by a 23 bit +// value. This spans more than 1h, which is usually enough as we close a +// chunk anyway if it doesn't receive any sample in 1h. +// +// - Should we nevertheless encounter a larger ΔΔt, we simply close the chunk, +// add the new sample as the last of the chunk, and add subsequent samples to +// a new chunk. +// +// VALUE ENCODING +// +// Value encoding can change and is determined by the two least significant bits +// of the 'flags' byte at bit position 280. The encoding can be changed without +// transcoding upon adding the 3rd sample. After that, an encoding change +// results either in transcoding or in closing the chunk. +// +// The 1st sample value is always saved directly. The 2nd sample value is saved +// in the header as the last value. Upon saving the 3rd value, an encoding is +// chosen, and the chunk is prepared accordingly. +// +// The following value encodings exist (with their value in the flags byte): +// +// 0: "Zero encoding". +// +// In many time series, the value simply stays constant over a long time +// (e.g. the "up" time series). In that case, all sample values are determined +// by the 1st value, and no further value encoding is happening at all. The +// payload consists entirely of timestamps. +// +// 1: Integer double-delta encoding. +// +// Many Prometheus metrics are integer counters and change in a quite regular +// fashion, similar to timestamps. Thus, the same double-delta encoding can be +// applied. This encoding works like the timestamp encoding described above, but +// with different bit buckets and without counting of repeated ΔΔv=0. The case +// of ΔΔv=0 is represented by a single '0' bit for each occurrence. The first Δv +// is saved as an int32 at bit position 288. The most recent Δv is saved as an +// int32 at the end of the chunk (see above). If Δv cannot be represented as a +// 32 bit signed integer, no integer double-delta encoding can be applied. +// +// Bit buckets (lead-in bytes followed by (signed) value bits): +// - '0': 0 bit +// - '10': 6 bit +// - '110': 13 bit +// - '1110': 20 bit +// - '1111': 33 bit +// Since Δv is restricted to 32 bit, 33 bit are always enough for ΔΔv. +// +// 2: XOR encoding. +// +// This follows almost precisely the Gorilla value encoding (cf. section 4.1.2 +// of the paper). The last count of leading zeros and the last count of +// meaningful bits in the XOR value is saved at the end of the chunk for as long +// as the chunk is not closed yet (see above). Note, though, that the number of +// significant bits is saved as (count-1), i.e. a saved value of 0 means 1 +// significant bit, a saved value of 1 means 2, and so on. Also, we save the +// numbers of leading zeros and significant bits anew if they drop a +// lot. Otherwise, you can easily be locked in with a high number of significant +// bits. +// +// 3: Direct encoding. +// +// If the sample values are just random, it is most efficient to save sample +// values directly as float64. +// +// ZIPPING TIMESTAMPS AND VALUES TOGETHER +// +// Usually, encoded timestamps and encoded values simply alternate. There are +// two exceptions: +// +// (1) With the "zero encoding" for values, the payload only contains +// timestamps. +// +// (2) In a consecutive row of up to 128 ΔΔt=0 repeats, the count of timestamps +// determines how many sample values will follow directly after another. + +const ( + varbitMinLength = 128 + varbitMaxLength = 8191 + + // Useful byte offsets. + varbitFirstTimeOffset = 0 + varbitFirstValueOffset = 8 + varbitLastTimeOffset = 16 + varbitLastValueOffset = 24 + varbitFirstTimeDeltaOffset = 32 + varbitFlagOffset = 35 + varbitNextSampleBitOffsetOffset = 36 + varbitFirstValueDeltaOffset = 38 + // The following are in the "footer" and only usable if the chunk is + // still open. + varbitCountOffsetBitOffset = chunkLen - 9 + varbitLastTimeDeltaOffset = chunkLen - 7 + varbitLastValueDeltaOffset = chunkLen - 4 + varbitLastLeadingZerosCountOffset = chunkLen - 4 + varbitLastSignificantBitsCountOffset = chunkLen - 3 + + varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here. + varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here. + // varbitThirdSampleBitOffset is a bit special. Depending on the encoding, there can + // be various things at this offset. It's most of the time symbolic, but in the best + // case (zero encoding for values), it will be the real offset for the 3rd sample. + varbitThirdSampleBitOffset uint16 = varbitFirstValueDeltaOffset * 8 + + // If the bit offset for the next sample is above this threshold, no new + // samples can be added to the chunk's payload (because the payload has + // already reached the footer). However, one more sample can be saved in + // the header as the last sample. + varbitNextSampleBitOffsetThreshold = 8 * varbitCountOffsetBitOffset + + varbitMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp. +) + +type varbitValueEncoding byte + +const ( + varbitZeroEncoding varbitValueEncoding = iota + varbitIntDoubleDeltaEncoding + varbitXOREncoding + varbitDirectEncoding +) + +// varbitWorstCaseBitsPerSample provides the worst-case number of bits needed +// per sample with the various value encodings. The counts already include the +// up to 27 bits taken by a timestamp. +var varbitWorstCaseBitsPerSample = map[varbitValueEncoding]int{ + varbitZeroEncoding: 27 + 0, + varbitIntDoubleDeltaEncoding: 27 + 38, + varbitXOREncoding: 27 + 13 + 64, + varbitDirectEncoding: 27 + 64, +} + +// varbitChunk implements the chunk interface. +type varbitChunk []byte + +// newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all +// varbit chunks must have the length as determined by the chunkLen constant. +func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { + if chunkLen < varbitMinLength || chunkLen > varbitMaxLength { + panic(fmt.Errorf( + "invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes", + chunkLen, varbitMinLength, varbitMaxLength, + )) + } + if enc > varbitDirectEncoding { + panic(fmt.Errorf("unknown varbit value encoding: %v", enc)) + } + c := make(varbitChunk, chunkLen) + c.setValueEncoding(enc) + return &c +} + +// add implements chunk. +func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) { + offset := c.nextSampleOffset() + switch { + case c.closed(): + return addToOverflowChunk(c, s) + case offset > varbitNextSampleBitOffsetThreshold: + return c.addLastSample(s), nil + case offset == varbitFirstSampleBitOffset: + return c.addFirstSample(s), nil + case offset == varbitSecondSampleBitOffset: + return c.addSecondSample(s) + } + return c.addLaterSample(s, offset) +} + +// clone implements chunk. +func (c varbitChunk) clone() chunk { + clone := make(varbitChunk, len(c)) + copy(clone, c) + return &clone +} + +// newIterator implements chunk. +func (c varbitChunk) newIterator() chunkIterator { + return newVarbitChunkIterator(c) +} + +// marshal implements chunk. +func (c varbitChunk) marshal(w io.Writer) error { + n, err := w.Write(c) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) + } + return nil +} + +// marshalToBuf implements chunk. +func (c varbitChunk) marshalToBuf(buf []byte) error { + n := copy(buf, c) + if n != len(c) { + return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c varbitChunk) unmarshal(r io.Reader) error { + _, err := io.ReadFull(r, c) + return err +} + +// unmarshalFromBuf implements chunk. +func (c varbitChunk) unmarshalFromBuf(buf []byte) error { + if copied := copy(c, buf); copied != cap(c) { + return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied) + } + return nil +} + +// encoding implements chunk. +func (c varbitChunk) encoding() chunkEncoding { return varbit } + +// firstTime implements chunk. +func (c varbitChunk) firstTime() model.Time { + return model.Time( + binary.BigEndian.Uint64( + c[varbitFirstTimeOffset:], + ), + ) +} + +func (c varbitChunk) firstValue() model.SampleValue { + return model.SampleValue( + math.Float64frombits( + binary.BigEndian.Uint64( + c[varbitFirstValueOffset:], + ), + ), + ) +} + +func (c varbitChunk) lastTime() model.Time { + return model.Time( + binary.BigEndian.Uint64( + c[varbitLastTimeOffset:], + ), + ) +} + +func (c varbitChunk) lastValue() model.SampleValue { + return model.SampleValue( + math.Float64frombits( + binary.BigEndian.Uint64( + c[varbitLastValueOffset:], + ), + ), + ) +} + +func (c varbitChunk) firstTimeDelta() model.Time { + // Only the first 3 bytes are actually the timestamp, so get rid of the + // last one by bitshifting. + return model.Time(c[varbitFirstTimeDeltaOffset+2]) | + model.Time(c[varbitFirstTimeDeltaOffset+1])<<8 | + model.Time(c[varbitFirstTimeDeltaOffset])<<16 +} + +// firstValueDelta returns an undefined result if the encoding type is not 1. +func (c varbitChunk) firstValueDelta() int32 { + return int32(binary.BigEndian.Uint32(c[varbitFirstValueDeltaOffset:])) +} + +// lastTimeDelta returns an undefined result if the chunk is closed already. +func (c varbitChunk) lastTimeDelta() model.Time { + return model.Time(c[varbitLastTimeDeltaOffset+2]) | + model.Time(c[varbitLastTimeDeltaOffset+1])<<8 | + model.Time(c[varbitLastTimeDeltaOffset])<<16 +} + +// setLastTimeDelta must not be called if the chunk is closed already. It most +// not be called with a time that doesn't fit into 24bit, either. +func (c varbitChunk) setLastTimeDelta(dT model.Time) { + if dT > varbitMaxTimeDelta { + panic("Δt overflows 24 bit") + } + c[varbitLastTimeDeltaOffset] = byte(dT >> 16) + c[varbitLastTimeDeltaOffset+1] = byte(dT >> 8) + c[varbitLastTimeDeltaOffset+2] = byte(dT) +} + +// lastValueDelta returns an undefined result if the chunk is closed already. +func (c varbitChunk) lastValueDelta() int32 { + return int32(binary.BigEndian.Uint32(c[varbitLastValueDeltaOffset:])) +} + +// setLastValueDelta must not be called if the chunk is closed already. +func (c varbitChunk) setLastValueDelta(dV int32) { + binary.BigEndian.PutUint32(c[varbitLastValueDeltaOffset:], uint32(dV)) +} + +func (c varbitChunk) nextSampleOffset() uint16 { + return binary.BigEndian.Uint16(c[varbitNextSampleBitOffsetOffset:]) +} + +func (c varbitChunk) setNextSampleOffset(offset uint16) { + binary.BigEndian.PutUint16(c[varbitNextSampleBitOffsetOffset:], offset) +} + +func (c varbitChunk) valueEncoding() varbitValueEncoding { + return varbitValueEncoding(c[varbitFlagOffset] & 0x03) +} + +func (c varbitChunk) setValueEncoding(enc varbitValueEncoding) { + if enc > varbitDirectEncoding { + panic("invalid varbit value encoding") + } + c[varbitFlagOffset] &^= 0x03 // Clear. + c[varbitFlagOffset] |= byte(enc) // Set. +} + +func (c varbitChunk) closed() bool { + return c[varbitFlagOffset] > 0x7F // Most significant bit set. +} + +func (c varbitChunk) zeroDDTRepeats() (repeats uint64, offset uint16) { + offset = binary.BigEndian.Uint16(c[varbitCountOffsetBitOffset:]) + if offset == 0 { + return 0, 0 + } + return c.readBitPattern(offset, 7) + 1, offset +} + +func (c varbitChunk) setZeroDDTRepeats(repeats uint64, offset uint16) { + switch repeats { + case 0: + // Just clear the offset. + binary.BigEndian.PutUint16(c[varbitCountOffsetBitOffset:], 0) + return + case 1: + // First time we set a repeat here, so set the offset. But only + // if we haven't reached the footer yet. (If that's the case, we + // would overwrite ourselves below, and we don't need the offset + // later anyway because no more samples will be added to this + // chunk.) + if offset+7 <= varbitNextSampleBitOffsetThreshold { + binary.BigEndian.PutUint16(c[varbitCountOffsetBitOffset:], offset) + } + default: + // For a change, we are writing somewhere where we have written + // before. We need to clear the bits first. + posIn1stByte := offset % 8 + c[offset/8] &^= bitMask[7][posIn1stByte] + if posIn1stByte > 1 { + c[offset/8+1] &^= bitMask[posIn1stByte-1][0] + } + } + c.addBitPattern(offset, repeats-1, 7) +} + +func (c varbitChunk) setLastSample(s model.SamplePair) { + binary.BigEndian.PutUint64( + c[varbitLastTimeOffset:], + uint64(s.Timestamp), + ) + binary.BigEndian.PutUint64( + c[varbitLastValueOffset:], + math.Float64bits(float64(s.Value)), + ) +} + +// addFirstSample is a helper method only used by c.add(). It adds timestamp and +// value as base time and value. +func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk { + binary.BigEndian.PutUint64( + (*c)[varbitFirstTimeOffset:], + uint64(s.Timestamp), + ) + binary.BigEndian.PutUint64( + (*c)[varbitFirstValueOffset:], + math.Float64bits(float64(s.Value)), + ) + c.setLastSample(s) // To simplify handling of single-sample chunks. + c.setNextSampleOffset(varbitSecondSampleBitOffset) + return []chunk{c} +} + +// addSecondSample is a helper method only used by c.add(). It calculates the +// first time delta from the provided sample and adds it to the chunk together +// with the provided sample as the last sample. +func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { + firstTimeDelta := s.Timestamp - c.firstTime() + if firstTimeDelta < 0 { + return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) + } + if firstTimeDelta > varbitMaxTimeDelta { + // A time delta too great. Still, we can add it as a last sample + // before overflowing. + return c.addLastSample(s), nil + } + (*c)[varbitFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16) + (*c)[varbitFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8) + (*c)[varbitFirstTimeDeltaOffset+2] = byte(firstTimeDelta) + + // Also set firstTimeDelta as the last time delta to be able to use the + // normal methods for adding later samples. + c.setLastTimeDelta(firstTimeDelta) + + c.setLastSample(s) + c.setNextSampleOffset(varbitThirdSampleBitOffset) + return []chunk{c}, nil +} + +// addLastSample isa a helper method only used by c.add() and in other helper +// methods called by c.add(). It simply sets the given sample as the last sample +// in the heador and declares the chunk closed. In other words, addLastSample +// adds the very last sample added to this chunk ever, while setLastSample sets +// the sample most recently added to the chunk so that it can be used for the +// calculations required to add the next sample. +func (c *varbitChunk) addLastSample(s model.SamplePair) []chunk { + c.setLastSample(s) + (*c)[varbitFlagOffset] |= 0x80 + return []chunk{c} +} + +// addLaterSample is a helper method only used by c.add(). It adds a third or +// later sample. +func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) { + var ( + lastTime = c.lastTime() + lastTimeDelta = c.lastTimeDelta() + newTimeDelta = s.Timestamp - lastTime + lastValue = c.lastValue() + encoding = c.valueEncoding() + ) + + if newTimeDelta < 0 { + return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta) + } + if offset == varbitThirdSampleBitOffset { + offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding) + } + if newTimeDelta > varbitMaxTimeDelta { + // A time delta too great. Still, we can add it as a last sample + // before overflowing. + return c.addLastSample(s), nil + } + + // Analyze worst case, does it fit? If not, set new sample as the last. + if int(offset)+varbitWorstCaseBitsPerSample[encoding] > chunkLen*8 { + return c.addLastSample(s), nil + } + + // Transcoding/overflow decisions first. + if encoding == varbitZeroEncoding && s.Value != lastValue { + // Cannot go on with zero encoding. + if offset > chunkLen*4 { + // Chunk already half full. Don't transcode, overflow instead. + return addToOverflowChunk(c, s) + } + if isInt32(s.Value - lastValue) { + // Trying int encoding looks promising. + return transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s) + } + return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + } + if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) { + // Cannot go on with int encoding. + if offset > chunkLen*4 { + // Chunk already half full. Don't transcode, overflow instead. + return addToOverflowChunk(c, s) + } + return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s) + } + + offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta) + if overflow { + return c.addLastSample(s), nil + } + switch encoding { + case varbitZeroEncoding: + // Nothing to do. + case varbitIntDoubleDeltaEncoding: + offset = c.addDDValue(offset, lastValue, s.Value) + case varbitXOREncoding: + offset = c.addXORValue(offset, lastValue, s.Value) + case varbitDirectEncoding: + offset = c.addBitPattern(offset, math.Float64bits(float64(s.Value)), 64) + default: + return nil, fmt.Errorf("unknown Varbit value encoding: %v", encoding) + } + + c.setNextSampleOffset(offset) + c.setLastSample(s) + return []chunk{c}, nil +} + +func (c varbitChunk) prepForThirdSample( + lastValue, newValue model.SampleValue, encoding varbitValueEncoding, +) (uint16, varbitValueEncoding) { + var ( + offset = varbitThirdSampleBitOffset + firstValue = c.firstValue() + firstValueDelta = lastValue - firstValue + firstXOR = math.Float64bits(float64(firstValue)) ^ math.Float64bits(float64(lastValue)) + _, firstSignificantBits = countBits(firstXOR) + secondXOR = math.Float64bits(float64(lastValue)) ^ math.Float64bits(float64(newValue)) + _, secondSignificantBits = countBits(secondXOR) + ) + // Now pick an initial encoding and prepare things accordingly. + // However, never pick an encoding "below" the one initially set. + switch { + case encoding == varbitZeroEncoding && lastValue == firstValue && lastValue == newValue: + // Stay at zero encoding. + // No value to be set. + // No offset change required. + case encoding <= varbitIntDoubleDeltaEncoding && isInt32(firstValueDelta): + encoding = varbitIntDoubleDeltaEncoding + binary.BigEndian.PutUint32( + c[varbitFirstValueDeltaOffset:], + uint32(int32(firstValueDelta)), + ) + c.setLastValueDelta(int32(firstValueDelta)) + offset += 32 + case encoding == varbitDirectEncoding || firstSignificantBits+secondSignificantBits > 100: + // Heuristics based on three samples only is a bit weak, + // but if we need 50+13 = 63 bits per sample already + // now, we might be better off going for direct encoding. + encoding = varbitDirectEncoding + // Put bit pattern directly where otherwise the delta would have gone. + binary.BigEndian.PutUint64( + c[varbitFirstValueDeltaOffset:], + math.Float64bits(float64(lastValue)), + ) + offset += 64 + default: + encoding = varbitXOREncoding + offset = c.addXORValue(offset, firstValue, lastValue) + } + c.setValueEncoding(encoding) + c.setNextSampleOffset(offset) + return offset, encoding +} + +// addDDTime requires that lastTimeDelta and newTimeDelta are positive and don't overflow 24bit. +func (c varbitChunk) addDDTime(offset uint16, lastTimeDelta, newTimeDelta model.Time) (newOffset uint16, overflow bool) { + timeDD := newTimeDelta - lastTimeDelta + + if !isSignedIntN(int64(timeDD), 23) { + return offset, true + } + + c.setLastTimeDelta(newTimeDelta) + repeats, repeatsOffset := c.zeroDDTRepeats() + + if timeDD == 0 { + if repeats == 0 || repeats == 128 { + // First zeroDDT, or counter full, prepare new counter. + offset = c.addZeroBit(offset) + repeatsOffset = offset + offset += 7 + repeats = 0 + } + c.setZeroDDTRepeats(repeats+1, repeatsOffset) + return offset, false + } + + // No zero repeat. If we had any before, clear the DDT offset. + c.setZeroDDTRepeats(0, repeatsOffset) + + switch { + case isSignedIntN(int64(timeDD), 6): + offset = c.addOneBitsWithTrailingZero(offset, 1) + offset = c.addSignedInt(offset, int64(timeDD), 6) + case isSignedIntN(int64(timeDD), 17): + offset = c.addOneBitsWithTrailingZero(offset, 2) + offset = c.addSignedInt(offset, int64(timeDD), 17) + case isSignedIntN(int64(timeDD), 23): + offset = c.addOneBits(offset, 3) + offset = c.addSignedInt(offset, int64(timeDD), 23) + default: + panic("unexpected required bits for ΔΔt") + } + return offset, false +} + +// addDDValue requires that newValue-lastValue can be represented with an int32. +func (c varbitChunk) addDDValue(offset uint16, lastValue, newValue model.SampleValue) uint16 { + newValueDelta := int64(newValue - lastValue) + lastValueDelta := c.lastValueDelta() + valueDD := newValueDelta - int64(lastValueDelta) + c.setLastValueDelta(int32(newValueDelta)) + + switch { + case valueDD == 0: + return c.addZeroBit(offset) + case isSignedIntN(valueDD, 6): + offset = c.addOneBitsWithTrailingZero(offset, 1) + return c.addSignedInt(offset, valueDD, 6) + case isSignedIntN(valueDD, 13): + offset = c.addOneBitsWithTrailingZero(offset, 2) + return c.addSignedInt(offset, valueDD, 13) + case isSignedIntN(valueDD, 20): + offset = c.addOneBitsWithTrailingZero(offset, 3) + return c.addSignedInt(offset, valueDD, 20) + case isSignedIntN(valueDD, 33): + offset = c.addOneBits(offset, 4) + return c.addSignedInt(offset, valueDD, 33) + default: + panic("unexpected required bits for ΔΔv") + } +} + +func (c varbitChunk) addXORValue(offset uint16, lastValue, newValue model.SampleValue) uint16 { + lastPattern := math.Float64bits(float64(lastValue)) + newPattern := math.Float64bits(float64(newValue)) + xor := lastPattern ^ newPattern + if xor == 0 { + return c.addZeroBit(offset) + } + + lastLeadingBits := c[varbitLastLeadingZerosCountOffset] + lastSignificantBits := c[varbitLastSignificantBitsCountOffset] + newLeadingBits, newSignificantBits := countBits(xor) + + // Short entry if the new significant bits fit into the same box as the + // last significant bits. However, should the new significant bits be + // shorter by 10 or more, go for a long entry instead, as we will + // probably save more (11 bit one-time overhead, potentially more to + // save later). + if newLeadingBits >= lastLeadingBits && + newLeadingBits+newSignificantBits <= lastLeadingBits+lastSignificantBits && + lastSignificantBits-newSignificantBits < 10 { + offset = c.addOneBitsWithTrailingZero(offset, 1) + return c.addBitPattern( + offset, + xor>>(64-lastLeadingBits-lastSignificantBits), + uint16(lastSignificantBits), + ) + } + + // Long entry. + c[varbitLastLeadingZerosCountOffset] = newLeadingBits + c[varbitLastSignificantBitsCountOffset] = newSignificantBits + offset = c.addOneBits(offset, 2) + offset = c.addBitPattern(offset, uint64(newLeadingBits), 5) + offset = c.addBitPattern(offset, uint64(newSignificantBits-1), 6) // Note -1! + return c.addBitPattern( + offset, + xor>>(64-newLeadingBits-newSignificantBits), + uint16(newSignificantBits), + ) +} + +func (c varbitChunk) addZeroBit(offset uint16) uint16 { + if offset < varbitNextSampleBitOffsetThreshold { + // Writing a zero to a never touched area is a no-op. + // Just increase the offset. + return offset + 1 + } + c[offset/8] &^= bitMask[1][offset%8] + return offset + 1 +} + +func (c varbitChunk) addOneBits(offset uint16, n uint16) uint16 { + if n > 7 { + panic("unexpected number of control bits") + } + b := 8 - offset%8 + if b > n { + b = n + } + c[offset/8] |= bitMask[b][offset%8] + offset += b + b = n - b + if b > 0 { + c[offset/8] |= bitMask[b][0] + offset += b + } + return offset +} +func (c varbitChunk) addOneBitsWithTrailingZero(offset uint16, n uint16) uint16 { + offset = c.addOneBits(offset, n) + return c.addZeroBit(offset) +} + +// addSignedInt adds i as a signed integer with n bits. It requires i to be +// representable as such. (Check with isSignedIntN first.) +func (c varbitChunk) addSignedInt(offset uint16, i int64, n uint16) uint16 { + if i < 0 && n < 64 { + i += 1 << n + } + return c.addBitPattern(offset, uint64(i), n) +} + +// addBitPattern adds the last n bits of the given pattern. Other bits in the +// pattern must be 0. +func (c varbitChunk) addBitPattern(offset uint16, pattern uint64, n uint16) uint16 { + var ( + byteOffset = offset / 8 + bitsToWrite = 8 - offset%8 + newOffset = offset + n + ) + + // Clean up the parts of the footer we will write into. (But not more as + // we are still using the value related part of the footer when we have + // already overwritten timestamp related parts.) + if newOffset > varbitNextSampleBitOffsetThreshold { + pos := offset + if pos < varbitNextSampleBitOffsetThreshold { + pos = varbitNextSampleBitOffsetThreshold + } + for pos < newOffset { + posInByte := pos % 8 + bitsToClear := newOffset - pos + if bitsToClear > 8-posInByte { + bitsToClear = 8 - posInByte + } + c[pos/8] &^= bitMask[bitsToClear][posInByte] + pos += bitsToClear + } + } + + for n > 0 { + if n <= bitsToWrite { + c[byteOffset] |= byte(pattern << (bitsToWrite - n)) + break + } + c[byteOffset] |= byte(pattern >> (n - bitsToWrite)) + n -= bitsToWrite + bitsToWrite = 8 + byteOffset++ + } + return newOffset +} + +// readBitPattern reads n bits at the given offset and returns them as the last +// n bits in a uint64. +func (c varbitChunk) readBitPattern(offset, n uint16) uint64 { + var ( + result uint64 + byteOffset = offset / 8 + bitOffset = offset % 8 + trailingBits, bitsToRead uint16 + ) + + for n > 0 { + trailingBits = 0 + bitsToRead = 8 - bitOffset + if bitsToRead > n { + trailingBits = bitsToRead - n + bitsToRead = n + } + result <<= bitsToRead + result |= uint64( + (c[byteOffset] & bitMask[bitsToRead][bitOffset]) >> trailingBits, + ) + n -= bitsToRead + byteOffset++ + bitOffset = 0 + } + return result +} + +type varbitChunkIterator struct { + c varbitChunk + // pos is the bit position within the chunk for the next sample to be + // decoded when scan() is called (i.e. it is _not_ the bit position of + // the sample currently returned by value()). The symbolic values + // varbitFirstSampleBitOffset and varbitSecondSampleBitOffset are also + // used for pos. len is the offset of the first bit in the chunk that is + // not part of the payload. If pos==len, then the iterator is positioned + // behind the last sample in the payload. However, the next call of + // scan() still has to check if the chunk is closed, in which case there + // is one more sample, saved in the header. To mark the iterator as + // having scanned that last sample, too, pos is set to len+1. + pos, len uint16 + t, dT model.Time + repeats byte // Repeats of ΔΔt=0. + v model.SampleValue + dV int64 // Only used for int value encoding. + leading, significant uint16 + enc varbitValueEncoding + lastError error + rewound bool + nextT model.Time // Only for rewound state. + nextV model.SampleValue // Only for rewound state. +} + +func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator { + return &varbitChunkIterator{ + c: c, + len: c.nextSampleOffset(), + t: model.Earliest, + enc: c.valueEncoding(), + significant: 1, + } +} + +// lastTimestamp implements chunkIterator. +func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) { + if it.len == varbitFirstSampleBitOffset { + // No samples in the chunk yet. + return model.Earliest, it.lastError + } + return it.c.lastTime(), it.lastError +} + +// contains implements chunkIterator. +func (it *varbitChunkIterator) contains(t model.Time) (bool, error) { + last, err := it.lastTimestamp() + if err != nil { + it.lastError = err + return false, err + } + return !t.Before(it.c.firstTime()) && + !t.After(last), it.lastError +} + +// scan implements chunkIterator. +func (it *varbitChunkIterator) scan() bool { + if it.lastError != nil { + return false + } + if it.rewound { + it.t = it.nextT + it.v = it.nextV + it.rewound = false + return true + } + if it.pos > it.len { + return false + } + if it.pos == it.len && it.repeats == 0 { + it.pos = it.len + 1 + if !it.c.closed() { + return false + } + it.t = it.c.lastTime() + it.v = it.c.lastValue() + return it.lastError == nil + } + if it.pos == varbitFirstSampleBitOffset { + it.t = it.c.firstTime() + it.v = it.c.firstValue() + it.pos = varbitSecondSampleBitOffset + return it.lastError == nil + } + if it.pos == varbitSecondSampleBitOffset { + if it.len == varbitThirdSampleBitOffset && !it.c.closed() { + // Special case: Chunk has only two samples. + it.t = it.c.lastTime() + it.v = it.c.lastValue() + it.pos = it.len + 1 + return it.lastError == nil + } + it.dT = it.c.firstTimeDelta() + it.t += it.dT + // Value depends on encoding. + switch it.enc { + case varbitZeroEncoding: + it.pos = varbitThirdSampleBitOffset + case varbitIntDoubleDeltaEncoding: + it.dV = int64(it.c.firstValueDelta()) + it.v += model.SampleValue(it.dV) + it.pos = varbitThirdSampleBitOffset + 32 + case varbitXOREncoding: + it.pos = varbitThirdSampleBitOffset + it.readXOR() + case varbitDirectEncoding: + it.v = model.SampleValue(math.Float64frombits( + binary.BigEndian.Uint64(it.c[varbitThirdSampleBitOffset/8:]), + )) + it.pos = varbitThirdSampleBitOffset + 64 + default: + it.lastError = fmt.Errorf("unknown varbit value encoding: %v", it.enc) + } + return it.lastError == nil + } + // 3rd sample or later does not have special cases anymore. + it.readDDT() + switch it.enc { + case varbitZeroEncoding: + // Do nothing. + case varbitIntDoubleDeltaEncoding: + it.readDDV() + case varbitXOREncoding: + it.readXOR() + case varbitDirectEncoding: + it.v = model.SampleValue(math.Float64frombits(it.readBitPattern(64))) + return it.lastError == nil + default: + it.lastError = fmt.Errorf("unknown varbit value encoding: %v", it.enc) + return false + } + return it.lastError == nil +} + +// findAtOrBefore implements chunkIterator. +func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool { + if it.len == 0 || t.Before(it.c.firstTime()) { + return false + } + last := it.c.lastTime() + if !t.Before(last) { + it.t = last + it.v = it.c.lastValue() + it.pos = it.len + 1 + return true + } + if t == it.t { + return it.lastError == nil + } + if t.Before(it.t) || it.rewound { + it.reset() + } + + var ( + prevT = model.Earliest + prevV model.SampleValue + ) + for it.scan() && t.After(it.t) { + prevT = it.t + prevV = it.v + // TODO(beorn7): If we are in a repeat, we could iterate forward + // much faster. + } + if t == it.t { + return it.lastError == nil + } + it.rewind(prevT, prevV) + return it.lastError == nil +} + +// findAtOrAfter implements chunkIterator. +func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool { + if it.len == 0 || t.After(it.c.lastTime()) { + return false + } + first := it.c.firstTime() + if !t.After(first) { + it.reset() + return it.scan() + } + if t == it.t { + return it.lastError == nil + } + if t.Before(it.t) { + it.reset() + } + for it.scan() && t.After(it.t) { + // TODO(beorn7): If we are in a repeat, we could iterate forward + // much faster. + } + return it.lastError == nil +} + +// value implements chunkIterator. +func (it *varbitChunkIterator) value() model.SamplePair { + return model.SamplePair{ + Timestamp: it.t, + Value: it.v, + } +} + +// err implements chunkIterator. +func (it *varbitChunkIterator) err() error { + return it.lastError +} + +func (it *varbitChunkIterator) readDDT() { + if it.repeats > 0 { + it.repeats-- + } else { + switch it.readControlBits(3) { + case 0: + it.repeats = byte(it.readBitPattern(7)) + case 1: + it.dT += model.Time(it.readSignedInt(6)) + case 2: + it.dT += model.Time(it.readSignedInt(17)) + case 3: + it.dT += model.Time(it.readSignedInt(23)) + default: + panic("unexpected number of control bits") + } + } + it.t += it.dT +} + +func (it *varbitChunkIterator) readDDV() { + switch it.readControlBits(4) { + case 0: + // Do nothing. + case 1: + it.dV += it.readSignedInt(6) + case 2: + it.dV += it.readSignedInt(13) + case 3: + it.dV += it.readSignedInt(20) + case 4: + it.dV += it.readSignedInt(33) + default: + panic("unexpected number of control bits") + } + it.v += model.SampleValue(it.dV) +} + +func (it *varbitChunkIterator) readXOR() { + switch it.readControlBits(2) { + case 0: + return + case 1: + // Do nothing right now. All done below. + case 2: + it.leading = uint16(it.readBitPattern(5)) + it.significant = uint16(it.readBitPattern(6)) + 1 + default: + panic("unexpected number of control bits") + } + pattern := math.Float64bits(float64(it.v)) + pattern ^= it.readBitPattern(it.significant) << (64 - it.significant - it.leading) + it.v = model.SampleValue(math.Float64frombits(pattern)) +} + +// readControlBits reads successive 1-bits and stops after reading the first +// 0-bit. It also stops once it has read max bits. It returns the number of read +// 1-bits. +func (it *varbitChunkIterator) readControlBits(max uint16) uint16 { + var count uint16 + for count < max && int(it.pos/8) < len(it.c) { + b := it.c[it.pos/8] & bitMask[1][it.pos%8] + it.pos++ + if b == 0 { + return count + } + count++ + } + if int(it.pos/8) >= len(it.c) { + it.lastError = errChunkBoundsExceeded + } + return count +} + +func (it *varbitChunkIterator) readBitPattern(n uint16) uint64 { + if len(it.c)*8 < int(it.pos)+int(n) { + it.lastError = errChunkBoundsExceeded + return 0 + } + u := it.c.readBitPattern(it.pos, n) + it.pos += n + return u +} + +func (it *varbitChunkIterator) readSignedInt(n uint16) int64 { + u := it.readBitPattern(n) + if n < 64 && u >= 1<<(n-1) { + u -= 1 << n + } + return int64(u) +} + +// reset puts the chunk iterator into the state it had upon creation. +func (it *varbitChunkIterator) reset() { + it.pos = 0 + it.t = model.Earliest + it.dT = 0 + it.repeats = 0 + it.v = 0 + it.dV = 0 + it.leading = 0 + it.significant = 1 + it.rewound = false +} + +// rewind "rewinds" the chunk iterator by one step. Since one cannot simply +// rewind a Varbit chunk, the old values have to be provided by the +// caller. Rewinding an already rewound chunk panics. After a call of scan or +// reset, a chunk can be rewound again. +func (it *varbitChunkIterator) rewind(t model.Time, v model.SampleValue) { + if it.rewound { + panic("cannot rewind varbit chunk twice") + } + it.rewound = true + it.nextT = it.t + it.nextV = it.v + it.t = t + it.v = v +} diff --git a/storage/local/varbit_helpers.go b/storage/local/varbit_helpers.go new file mode 100644 index 000000000..771fb7ded --- /dev/null +++ b/storage/local/varbit_helpers.go @@ -0,0 +1,75 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import "github.com/prometheus/common/model" + +var ( + // bit masks for consecutive bits in a byte at various offsets. + bitMask = [][]byte{ + {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, // 0 bit + {0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01}, // 1 bit + {0xC0, 0x60, 0x30, 0x18, 0x0C, 0x06, 0x03, 0x01}, // 2 bit + {0xE0, 0x70, 0x38, 0x1C, 0x0E, 0x07, 0x03, 0x01}, // 3 bit + {0xF0, 0x78, 0x3C, 0x1E, 0x0F, 0x07, 0x03, 0x01}, // 4 bit + {0xF8, 0x7C, 0x3E, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 5 bit + {0xFC, 0x7E, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 6 bit + {0xFE, 0x7F, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 7 bit + {0xFF, 0x7F, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 8 bit + } +) + +// isInt32 returns true if v can be represented as an int32. +func isInt32(v model.SampleValue) bool { + return model.SampleValue(int32(v)) == v +} + +// countBits returs the number of leading zero bits and the number of +// significant bits after that in the given bit pattern. The maximum number of +// leading zeros is 31 (so that it can be represented by a 5bit number). Leading +// zeros beyond that are considered part of the significant bits. +func countBits(pattern uint64) (leading, significant byte) { + // TODO(beorn7): This would probably be faster with ugly endless switch + // statements. + if pattern == 0 { + return + } + for pattern < 1<<63 { + leading++ + pattern <<= 1 + } + for pattern > 0 { + significant++ + pattern <<= 1 + } + if leading > 31 { // 5 bit limit. + significant += leading - 31 + leading = 31 + } + return +} + +// isSignedIntN returns if n can be represented as a signed int with the given +// bit length. +func isSignedIntN(i int64, n byte) bool { + upper := int64(1) << (n - 1) + if i >= upper { + return false + } + lower := upper - (1 << n) + if i < lower { + return false + } + return true +} diff --git a/storage/local/varbit_test.go b/storage/local/varbit_test.go new file mode 100644 index 000000000..f440283f0 --- /dev/null +++ b/storage/local/varbit_test.go @@ -0,0 +1,52 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import "testing" + +func TestCountBits(t *testing.T) { + for i := byte(0); i < 56; i++ { + for j := byte(0); j <= 8; j++ { + for k := byte(0); k < 8; k++ { + p := uint64(bitMask[j][k]) << i + gotLeading, gotSignificant := countBits(p) + wantLeading := 56 - i + k + wantSignificant := j + if j+k > 8 { + wantSignificant -= j + k - 8 + } + if wantLeading > 31 { + wantSignificant += wantLeading - 31 + wantLeading = 31 + } + if p == 0 { + wantLeading = 0 + wantSignificant = 0 + } + if wantLeading != gotLeading { + t.Errorf( + "unexpected leading bit count for i=%d, j=%d, k=%d; want %d, got %d", + i, j, k, wantLeading, gotLeading, + ) + } + if wantSignificant != gotSignificant { + t.Errorf( + "unexpected significant bit count for i=%d, j=%d, k=%d; want %d, got %d", + i, j, k, wantSignificant, gotSignificant, + ) + } + } + } + } +} diff --git a/template/template_test.go b/template/template_test.go index 0dd46a4fd..7a39b3622 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -189,7 +189,7 @@ func TestTemplateExpansion(t *testing.T) { time := model.Time(0) - storage, closer := local.NewTestStorage(t, 1) + storage, closer := local.NewTestStorage(t, 2) defer closer.Close() storage.Append(&model.Sample{ Metric: model.Metric{ diff --git a/web/api/legacy/api_test.go b/web/api/legacy/api_test.go index 770e0af0e..e6ef21805 100644 --- a/web/api/legacy/api_test.go +++ b/web/api/legacy/api_test.go @@ -87,7 +87,7 @@ func TestQuery(t *testing.T) { }, } - storage, closer := local.NewTestStorage(t, 1) + storage, closer := local.NewTestStorage(t, 2) defer closer.Close() storage.Append(&model.Sample{ Metric: model.Metric{