prometheus/storage/local/persistence_test.go
beorn7 32f280a3cd Slim down the chunkIterator interface
For one, remove unneeded methods.

Then, instead of using a channel for all values, use a
bufio.Scanner-like interface. This removes the need for creating a
goroutine and avoids the (unnecessary) locking performed by channel
sending and receiving.

This will make it much easier to write new chunk implementations (like
Gorilla-style encoding).
2016-03-07 19:50:13 +01:00

1256 lines
33 KiB
Go

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"bufio"
"errors"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/testutil"
)
var (
m1 = model.Metric{"label": "value1"}
m2 = model.Metric{"label": "value2"}
m3 = model.Metric{"label": "value3"}
m4 = model.Metric{"label": "value4"}
m5 = model.Metric{"label": "value5"}
)
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) {
DefaultChunkEncoding = encoding
dir := testutil.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
if err != nil {
dir.Close()
t.Fatal(err)
}
go p.run()
return p, testutil.NewCallbackCloser(func() {
p.close()
dir.Close()
})
}
func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk {
fps := model.Fingerprints{
m1.FastFingerprint(),
m2.FastFingerprint(),
m3.FastFingerprint(),
}
fpToChunks := map[model.Fingerprint][]chunk{}
for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
ch, err := newChunkForEncoding(encoding)
if err != nil {
t.Fatal(err)
}
chs, err := ch.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(fp),
})
if err != nil {
t.Fatal(err)
}
fpToChunks[fp] = append(fpToChunks[fp], chs[0])
}
}
return fpToChunks
}
func chunksEqual(c1, c2 chunk) bool {
it1 := c1.newIterator()
it2 := c2.newIterator()
for it1.scan() && it2.scan() {
if !(it1.value() == it2.value()) {
return false
}
}
return it1.err() == nil && it2.err() == nil
}
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
fpToChunks := buildTestChunks(t, encoding)
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, model.Earliest, chunks)
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, model.Time(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
for fp, expectedChunks := range fpToChunks {
indexes := make([]int, 0, len(expectedChunks))
for i := range expectedChunks {
indexes = append(indexes, i)
}
actualChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for _, i := range indexes {
if !chunksEqual(expectedChunks[i], actualChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
// Load all chunk descs.
actualChunkDescs, err := p.loadChunkDescs(fp, 0)
if len(actualChunkDescs) != 10 {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
}
for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime,
)
}
}
// Load chunk descs partially.
actualChunkDescs, err = p.loadChunkDescs(fp, 5)
if len(actualChunkDescs) != 5 {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
}
for i, cd := range actualChunkDescs {
lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), lastTime,
)
}
}
}
// Drop half of the chunks.
for fp, expectedChunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 5, nil)
if err != nil {
t.Fatal(err)
}
if offset != 5 {
t.Errorf("want offset 5, got %d", offset)
}
if firstTime != 5 {
t.Errorf("want first time 5, got %d", firstTime)
}
if numDropped != 5 {
t.Errorf("want 5 dropped chunks, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
indexes := make([]int, 5)
for i := range indexes {
indexes[i] = i
}
actualChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for _, i := range indexes {
if !chunksEqual(expectedChunks[i+5], actualChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// Drop all the chunks.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 100, nil)
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if numDropped != 5 {
t.Errorf("want 5 dropped chunks, got %v", numDropped)
}
if !allDropped {
t.Error("not all chunks dropped")
}
}
// Re-add first two of the chunks.
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, model.Earliest, chunks[:2])
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, model.Time(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
// Drop the first of the chunks while adding two more.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[2:4])
if err != nil {
t.Fatal(err)
}
if offset != 1 {
t.Errorf("want offset 1, got %d", offset)
}
if firstTime != 1 {
t.Errorf("want first time 1, got %d", firstTime)
}
if numDropped != 1 {
t.Errorf("want 1 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[1:4]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// Drop all the chunks while adding two more.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 4, chunks[4:6])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 4 {
t.Errorf("want first time 4, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[4:6]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// While adding two more, drop all but one of the added ones.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 7, chunks[6:8])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 7 {
t.Errorf("want first time 7, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks[7:8]
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
// While adding two more, drop all chunks including the added ones.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 10, chunks[8:])
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunks, got %v", numDropped)
}
if !allDropped {
t.Error("not all chunks dropped")
}
}
// Now set minShrinkRatio to 0.25 and play with it.
p.minShrinkRatio = 0.25
// Re-add 8 chunks.
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, model.Earliest, chunks[:8])
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, model.Time(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
// Drop only the first chunk should not happen, but persistence should still work.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[8:9])
if err != nil {
t.Fatal(err)
}
if offset != 8 {
t.Errorf("want offset 8, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 0 {
t.Errorf("want 0 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
// Drop only the first two chunks should not happen, either.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 0 {
t.Errorf("want 0 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
// Drop the first three chunks should finally work.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 3, chunks[9:])
if err != nil {
t.Fatal(err)
}
if offset != 6 {
t.Errorf("want offset 6, got %d", offset)
}
if firstTime != 3 {
t.Errorf("want first time 3, got %d", firstTime)
}
if numDropped != 3 {
t.Errorf("want 3 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
}
}
func TestPersistLoadDropChunksType0(t *testing.T) {
testPersistLoadDropChunks(t, 0)
}
func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1)
}
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s1, _ := newMemorySeries(m1, nil, time.Time{})
s2, _ := newMemorySeries(m2, nil, time.Time{})
s3, _ := newMemorySeries(m3, nil, time.Time{})
s4, _ := newMemorySeries(m4, nil, time.Time{})
s5, _ := newMemorySeries(m5, nil, time.Time{})
s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true
s3.persistWatermark = 1
for i := 0; i < 10000; i++ {
s4.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i) / 2,
})
s5.add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i * i),
})
}
s5.persistWatermark = 3
chunkCountS4 := len(s4.chunkDescs)
chunkCountS5 := len(s5.chunkDescs)
sm.put(m1.FastFingerprint(), s1)
sm.put(m2.FastFingerprint(), s2)
sm.put(m3.FastFingerprint(), s3)
sm.put(m4.FastFingerprint(), s4)
sm.put(m5.FastFingerprint(), s5)
if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil {
t.Fatal(err)
}
loadedSM, _, err := p.loadSeriesMapAndHeads()
if err != nil {
t.Fatal(err)
}
if loadedSM.length() != 4 {
t.Errorf("want 4 series in map, got %d", loadedSM.length())
}
if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS1.metric, m1) {
t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
}
if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) {
t.Error("head chunks differ")
}
if loadedS1.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS1.chunkDescsOffset)
}
if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true")
}
if loadedS1.head().chunkFirstTime != 1 {
t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime)
}
if loadedS1.head().chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
} else {
t.Errorf("couldn't find %v in loaded map", m1)
}
if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
}
if loadedS3.head().c != nil {
t.Error("head chunk not evicted")
}
if loadedS3.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset)
}
if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false")
}
if loadedS3.head().chunkFirstTime != 2 {
t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime)
}
if loadedS3.head().chunkLastTime != 2 {
t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime)
}
} else {
t.Errorf("couldn't find %v in loaded map", m3)
}
if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS4.metric, m4) {
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)
}
if got, want := len(loadedS4.chunkDescs), chunkCountS4; got != want {
t.Errorf("got %d chunkDescs, want %d", got, want)
}
if got, want := loadedS4.persistWatermark, 0; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if loadedS4.chunkDescs[2].isEvicted() {
t.Error("3rd chunk evicted")
}
if loadedS4.chunkDescs[3].isEvicted() {
t.Error("4th chunk evicted")
}
if loadedS4.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS4.chunkDescsOffset)
}
if loadedS4.headChunkClosed {
t.Error("headChunkClosed is true")
}
for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS4.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, lastTime, cd.chunkLastTime,
)
}
}
} else {
t.Errorf("couldn't find %v in loaded map", m4)
}
if loadedS5, ok := loadedSM.get(m5.FastFingerprint()); ok {
if !reflect.DeepEqual(loadedS5.metric, m5) {
t.Errorf("want metric %v, got %v", m5, loadedS5.metric)
}
if got, want := len(loadedS5.chunkDescs), chunkCountS5; got != want {
t.Errorf("got %d chunkDescs, want %d", got, want)
}
if got, want := loadedS5.persistWatermark, 3; got != want {
t.Errorf("got persistWatermark %d, want %d", got, want)
}
if !loadedS5.chunkDescs[2].isEvicted() {
t.Error("3rd chunk not evicted")
}
if loadedS5.chunkDescs[3].isEvicted() {
t.Error("4th chunk evicted")
}
if loadedS5.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS5.chunkDescsOffset)
}
if loadedS5.headChunkClosed {
t.Error("headChunkClosed is true")
}
for i, cd := range loadedS5.chunkDescs {
if i < 3 {
// Evicted chunks.
if cd.chunkFirstTime == model.Earliest {
t.Errorf("chunkDesc[%d]: chunkLastTime not set", i)
}
continue
}
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS5.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.chunkLastTime, lastTime,
)
}
}
} else {
t.Errorf("couldn't find %v in loaded map", m5)
}
}
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType0(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 0)
}
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1)
}
func TestCheckpointAndLoadFPMappings(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
in := fpMappings{
1: map[string]model.Fingerprint{
"foo": 1,
"bar": 2,
},
3: map[string]model.Fingerprint{
"baz": 4,
},
}
if err := p.checkpointFPMappings(in); err != nil {
t.Fatal(err)
}
out, fp, err := p.loadFPMappings()
if err != nil {
t.Fatal(err)
}
if got, want := fp, model.Fingerprint(4); got != want {
t.Errorf("got highest FP %v, want %v", got, want)
}
if !reflect.DeepEqual(in, out) {
t.Errorf("got collision map %v, want %v", out, in)
}
}
func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
m1 := model.Metric{"n1": "v1"}
m2 := model.Metric{"n2": "v2"}
m3 := model.Metric{"n1": "v2"}
p.archiveMetric(1, m1, 2, 4)
p.archiveMetric(2, m2, 1, 6)
p.archiveMetric(3, m3, 5, 5)
expectedFPs := map[model.Time][]model.Fingerprint{
0: {},
1: {},
2: {2},
3: {1, 2},
4: {1, 2},
5: {1, 2},
6: {1, 2, 3},
}
for ts, want := range expectedFPs {
got, err := p.fingerprintsModifiedBefore(ts)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(want, got) {
t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got)
}
}
unarchived, err := p.unarchiveMetric(1)
if err != nil {
t.Fatal(err)
}
if !unarchived {
t.Error("expected actual unarchival")
}
unarchived, err = p.unarchiveMetric(1)
if err != nil {
t.Fatal(err)
}
if unarchived {
t.Error("expected no unarchival")
}
expectedFPs = map[model.Time][]model.Fingerprint{
0: {},
1: {},
2: {2},
3: {2},
4: {2},
5: {2},
6: {2, 3},
}
for ts, want := range expectedFPs {
got, err := p.fingerprintsModifiedBefore(ts)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(want, got) {
t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got)
}
}
}
func TestFingerprintsModifiedBeforeChunkType0(t *testing.T) {
testFingerprintsModifiedBefore(t, 0)
}
func TestFingerprintsModifiedBeforeChunkType1(t *testing.T) {
testFingerprintsModifiedBefore(t, 1)
}
func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
m1 := model.Metric{"n1": "v1"}
m2 := model.Metric{"n2": "v2"}
p.archiveMetric(1, m1, 2, 4)
p.archiveMetric(2, m2, 1, 6)
p.indexMetric(1, m1)
p.indexMetric(2, m2)
p.waitForIndexing()
outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
want := model.Fingerprints{1}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived {
t.Error("want FP 1 archived")
}
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
t.Error("want FP 2 archived")
}
if err != p.purgeArchivedMetric(1) {
t.Fatal(err)
}
if err != p.purgeArchivedMetric(3) {
// Purging something that has not beet archived is not an error.
t.Fatal(err)
}
p.waitForIndexing()
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
want = nil
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived {
t.Error("want FP 1 not archived")
}
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
t.Error("want FP 2 archived")
}
}
func TestDropArchivedMetricChunkType0(t *testing.T) {
testDropArchivedMetric(t, 0)
}
func TestDropArchivedMetricChunkType1(t *testing.T) {
testDropArchivedMetric(t, 1)
}
type incrementalBatch struct {
fpToMetric index.FingerprintMetricMapping
expectedLnToLvs index.LabelNameLabelValuesMapping
expectedLpToFps index.LabelPairFingerprintsMapping
}
func testIndexing(t *testing.T, encoding chunkEncoding) {
batches := []incrementalBatch{
{
fpToMetric: index.FingerprintMetricMapping{
0: {
model.MetricNameLabel: "metric_0",
"label_1": "value_1",
},
1: {
model.MetricNameLabel: "metric_0",
"label_2": "value_2",
"label_3": "value_3",
},
2: {
model.MetricNameLabel: "metric_1",
"label_1": "value_2",
},
},
expectedLnToLvs: index.LabelNameLabelValuesMapping{
model.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
},
"label_3": codable.LabelValueSet{
"value_3": struct{}{},
},
},
expectedLpToFps: index.LabelPairFingerprintsMapping{
model.LabelPair{
Name: model.MetricNameLabel,
Value: "metric_0",
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}},
model.LabelPair{
Name: model.MetricNameLabel,
Value: "metric_1",
}: codable.FingerprintSet{2: struct{}{}},
model.LabelPair{
Name: "label_1",
Value: "value_1",
}: codable.FingerprintSet{0: struct{}{}},
model.LabelPair{
Name: "label_1",
Value: "value_2",
}: codable.FingerprintSet{2: struct{}{}},
model.LabelPair{
Name: "label_2",
Value: "value_2",
}: codable.FingerprintSet{1: struct{}{}},
model.LabelPair{
Name: "label_3",
Value: "value_3",
}: codable.FingerprintSet{1: struct{}{}},
},
}, {
fpToMetric: index.FingerprintMetricMapping{
3: {
model.MetricNameLabel: "metric_0",
"label_1": "value_3",
},
4: {
model.MetricNameLabel: "metric_2",
"label_2": "value_2",
"label_3": "value_1",
},
5: {
model.MetricNameLabel: "metric_1",
"label_1": "value_3",
},
},
expectedLnToLvs: index.LabelNameLabelValuesMapping{
model.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
"metric_2": struct{}{},
},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"value_3": struct{}{},
},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
},
"label_3": codable.LabelValueSet{
"value_1": struct{}{},
"value_3": struct{}{},
},
},
expectedLpToFps: index.LabelPairFingerprintsMapping{
model.LabelPair{
Name: model.MetricNameLabel,
Value: "metric_0",
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}, 3: struct{}{}},
model.LabelPair{
Name: model.MetricNameLabel,
Value: "metric_1",
}: codable.FingerprintSet{2: struct{}{}, 5: struct{}{}},
model.LabelPair{
Name: model.MetricNameLabel,
Value: "metric_2",
}: codable.FingerprintSet{4: struct{}{}},
model.LabelPair{
Name: "label_1",
Value: "value_1",
}: codable.FingerprintSet{0: struct{}{}},
model.LabelPair{
Name: "label_1",
Value: "value_2",
}: codable.FingerprintSet{2: struct{}{}},
model.LabelPair{
Name: "label_1",
Value: "value_3",
}: codable.FingerprintSet{3: struct{}{}, 5: struct{}{}},
model.LabelPair{
Name: "label_2",
Value: "value_2",
}: codable.FingerprintSet{1: struct{}{}, 4: struct{}{}},
model.LabelPair{
Name: "label_3",
Value: "value_1",
}: codable.FingerprintSet{4: struct{}{}},
model.LabelPair{
Name: "label_3",
Value: "value_3",
}: codable.FingerprintSet{1: struct{}{}},
},
},
}
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
indexedFpsToMetrics := index.FingerprintMetricMapping{}
for i, b := range batches {
for fp, m := range b.fpToMetric {
p.indexMetric(fp, m)
if err := p.archiveMetric(fp, m, 1, 2); err != nil {
t.Fatal(err)
}
indexedFpsToMetrics[fp] = m
}
verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
}
for i := len(batches) - 1; i >= 0; i-- {
b := batches[i]
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
for fp, m := range b.fpToMetric {
p.unindexMetric(fp, m)
unarchived, err := p.unarchiveMetric(fp)
if err != nil {
t.Fatal(err)
}
if !unarchived {
t.Errorf("%d. metric not unarchived", i)
}
delete(indexedFpsToMetrics, fp)
}
}
}
func TestIndexingChunkType0(t *testing.T) {
testIndexing(t, 0)
}
func TestIndexingChunkType1(t *testing.T) {
testIndexing(t, 1)
}
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) {
p.waitForIndexing()
for fp, m := range indexedFpsToMetrics {
// Compare archived metrics with input metrics.
mOut, err := p.archivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !mOut.Equal(m) {
t.Errorf("%d. %v: Got: %s; want %s", i, fp, mOut, m)
}
// Check that archived metrics are in membership index.
has, first, last, err := p.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !has {
t.Errorf("%d. fingerprint %v not found", i, fp)
}
if first != 1 || last != 2 {
t.Errorf(
"%d. %v: Got first: %d, last %d; want first: %d, last %d",
i, fp, first, last, 1, 2,
)
}
}
// Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs {
outLvs, err := p.labelValuesForLabelName(ln)
if err != nil {
t.Fatal(err)
}
outSet := codable.LabelValueSet{}
for _, lv := range outLvs {
outSet[lv] = struct{}{}
}
if !reflect.DeepEqual(lvs, outSet) {
t.Errorf("%d. label values don't match. Got: %v; want %v", i, outSet, lvs)
}
}
// Compare label pair -> fingerprints mappings.
for lp, fps := range b.expectedLpToFps {
outFPs, err := p.fingerprintsForLabelPair(lp)
if err != nil {
t.Fatal(err)
}
outSet := codable.FingerprintSet{}
for _, fp := range outFPs {
outSet[fp] = struct{}{}
}
if !reflect.DeepEqual(fps, outSet) {
t.Errorf("%d. %v: fingerprints don't match. Got: %v; want %v", i, lp, outSet, fps)
}
}
}
func TestQuranatineSeriesFile(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) {
var (
fpStr = fp.String()
originalFile = p.fileNameForFingerprint(fp)
quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix)
)
if _, err := os.Stat(originalFile); !os.IsNotExist(err) {
t.Errorf("Expected file %q to not exist.", originalFile)
}
if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) {
t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err)
}
f, err := os.Open(hintFile)
defer f.Close()
if err != nil {
t.Errorf("Could not open hint file %q: %s", hintFile, err)
return
}
scanner := bufio.NewScanner(f)
for _, want := range contentHintFile {
if !scanner.Scan() {
t.Errorf("Unexpected end of hint file %q.", hintFile)
return
}
got := scanner.Text()
if want != got {
t.Errorf("Want hint line %q, got %q.", want, got)
}
}
if scanner.Scan() {
t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text())
}
}
if err := p.quarantineSeriesFile(0, nil, nil); err != nil {
t.Error(err)
}
verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
1, errors.New("file does not exist"),
nil,
); err != nil {
t.Error(err)
}
verify(1, false, "[UNKNOWN METRIC]", "file does not exist")
if err := p.quarantineSeriesFile(
2, errors.New("file does not exist"),
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist")
if err := p.quarantineSeriesFile(
3, nil,
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]")
err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(p.fileNameForFingerprint(4))
if err != nil {
t.Fatal(err)
}
f.Close()
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
verify(4, true, `{sound="cloud"}`, "file exists")
if err := p.quarantineSeriesFile(4, nil, nil); err != nil {
t.Error(err)
}
// Overwrites hint file but leaves series file intact.
verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
// Overwrites everything.
verify(4, true, `{sound="cloud"}`, "file exists")
}
var fpStrings = []string{
"b004b821ca50ba26",
"b037c21e884e4fc5",
"b037de1e884e5469",
}
func BenchmarkLoadChunksSequentially(b *testing.B) {
p := persistence{
basePath: "fixtures",
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
}
sequentialIndexes := make([]int, 47)
for i := range sequentialIndexes {
sequentialIndexes[i] = i
}
var fp model.Fingerprint
for i := 0; i < b.N; i++ {
for _, s := range fpStrings {
fp, _ = model.FingerprintFromString(s)
cds, err := p.loadChunks(fp, sequentialIndexes, 0)
if err != nil {
b.Error(err)
}
if len(cds) == 0 {
b.Error("could not read any chunks")
}
}
}
}
func BenchmarkLoadChunksRandomly(b *testing.B) {
p := persistence{
basePath: "fixtures",
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
}
randomIndexes := []int{1, 5, 6, 8, 11, 14, 18, 23, 29, 33, 42, 46}
var fp model.Fingerprint
for i := 0; i < b.N; i++ {
for _, s := range fpStrings {
fp, _ = model.FingerprintFromString(s)
cds, err := p.loadChunks(fp, randomIndexes, 0)
if err != nil {
b.Error(err)
}
if len(cds) == 0 {
b.Error("could not read any chunks")
}
}
}
}
func BenchmarkLoadChunkDescs(b *testing.B) {
p := persistence{
basePath: "fixtures",
}
var fp model.Fingerprint
for i := 0; i < b.N; i++ {
for _, s := range fpStrings {
fp, _ = model.FingerprintFromString(s)
cds, err := p.loadChunkDescs(fp, 0)
if err != nil {
b.Error(err)
}
if len(cds) == 0 {
b.Error("could not read any chunk descs")
}
}
}
}