Initial in-memory arena implementation.

It is unbounded, and nothing uses it except for a gating flag in main.
This commit is contained in:
Matt T. Proud 2013-02-08 18:03:26 +01:00
parent 536e07432d
commit 13ae29b304
24 changed files with 3127 additions and 2666 deletions

View file

@ -19,7 +19,7 @@ import (
)
var (
EarliestTime = EncodeTime(time.Unix(0, 0))
EarliestTime = EncodeTime(time.Time{})
)
func EncodeTimeInto(dst []byte, t time.Time) {

14
main.go
View file

@ -21,7 +21,9 @@ import (
"github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/leveldb"
"github.com/prometheus/prometheus/storage/metric/memory"
"github.com/prometheus/prometheus/web"
"log"
"os"
@ -35,6 +37,7 @@ var (
scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.")
ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.")
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
memoryArena = flag.Bool("experimental.useMemoryArena", false, "Use in-memory timeseries arena.")
)
func main() {
@ -44,9 +47,14 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
persistence, err := leveldb.NewLevelDBMetricPersistence(*metricsStoragePath)
if err != nil {
log.Fatalf("Error opening storage: %v", err)
var persistence metric.MetricPersistence
if *memoryArena {
persistence = memory.NewMemorySeriesStorage()
} else {
persistence, err = leveldb.NewLevelDBMetricPersistence(*metricsStoragePath)
if err != nil {
log.Fatalf("Error opening storage: %v", err)
}
}
go func() {

View file

@ -14,7 +14,6 @@
package model
import (
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
@ -24,7 +23,7 @@ import (
const (
// XXX: Re-evaluate down the road.
reservedDelimiter = '"'
reservedDelimiter = `"`
)
// A Fingerprint is a simplified representation of an entity---e.g., a hash of
@ -49,6 +48,20 @@ type LabelSet map[LabelName]LabelValue
// a singleton and refers to one and only one stream of samples.
type Metric map[LabelName]LabelValue
type Fingerprints []Fingerprint
func (f Fingerprints) Len() int {
return len(f)
}
func (f Fingerprints) Less(i, j int) bool {
return sort.StringsAreSorted([]string{string(f[i]), string(f[j])})
}
func (f Fingerprints) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
// Fingerprint generates a fingerprint for this given Metric.
func (m Metric) Fingerprint() Fingerprint {
labelLength := len(m)
@ -62,13 +75,11 @@ func (m Metric) Fingerprint() Fingerprint {
summer := md5.New()
buffer := bytes.Buffer{}
for _, labelName := range labelNames {
buffer.WriteString(labelName)
buffer.WriteRune(reservedDelimiter)
buffer.WriteString(string(m[LabelName(labelName)]))
summer.Write([]byte(labelName))
summer.Write([]byte(reservedDelimiter))
summer.Write([]byte(m[LabelName(labelName)]))
}
summer.Write(buffer.Bytes())
return Fingerprint(hex.EncodeToString(summer.Sum(nil)))
}
@ -98,9 +109,23 @@ type SamplePair struct {
Timestamp time.Time
}
type Values []SamplePair
func (v Values) Len() int {
return len(v)
}
func (v Values) Less(i, j int) bool {
return v[i].Timestamp.Before(v[j].Timestamp)
}
func (v Values) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
type SampleSet struct {
Metric Metric
Values []SamplePair
Values Values
}
type Interval struct {

View file

@ -40,7 +40,11 @@ func (p *PersistenceAdapter) getMetricsWithLabels(labels model.LabelSet) (metric
if err != nil {
return metrics, err
}
metrics = append(metrics, metric)
if metric == nil {
continue
}
metrics = append(metrics, *metric)
}
return
@ -74,7 +78,7 @@ func (p *PersistenceAdapter) GetBoundaryValues(labels model.LabelSet, interval *
sampleSets := []*model.SampleSet{}
for _, metric := range metrics {
// TODO: change to GetBoundaryValues() once it has the right return type.
sampleSet, err := p.persistence.GetRangeValues(metric, *interval, *p.stalenessPolicy)
sampleSet, err := p.persistence.GetRangeValues(metric, *interval)
if err != nil {
return nil, err
}
@ -97,7 +101,7 @@ func (p *PersistenceAdapter) GetRangeValues(labels model.LabelSet, interval *mod
sampleSets := []*model.SampleSet{}
for _, metric := range metrics {
sampleSet, err := p.persistence.GetRangeValues(metric, *interval, *p.stalenessPolicy)
sampleSet, err := p.persistence.GetRangeValues(metric, *interval)
if err != nil {
return nil, err
}

View file

@ -0,0 +1,228 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"time"
)
func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_mom",
},
}, t)
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_dad",
},
}, t)
result, err := p.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("name"): model.LabelValue("my_metric"),
})
if err != nil {
t.Error(err)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
result, err = p.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
})
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
result, err = p.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
})
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}
func GetFingerprintsForLabelNameTests(p MetricPersistence, t test.Tester) {
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_mom",
"language": "english",
},
}, t)
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_dad",
"sprache": "deutsch",
},
}, t)
b := model.LabelName("name")
result, err := p.GetFingerprintsForLabelName(b)
if err != nil {
t.Error(err)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("request_type")
result, err = p.GetFingerprintsForLabelName(b)
if err != nil {
t.Error(err)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("language")
result, err = p.GetFingerprintsForLabelName(b)
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
b = model.LabelName("sprache")
result, err = p.GetFingerprintsForLabelName(b)
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}
func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"request_type": "your_mom",
},
}, t)
appendSample(p, model.Sample{
Value: 0,
Timestamp: time.Time{},
Metric: model.Metric{
"request_type": "your_dad",
"one-off": "value",
},
}, t)
result, err := p.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
})
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e := p.GetMetricForFingerprint(result[0])
if e != nil {
t.Error(e)
}
if v == nil {
t.Fatal("Did not expect nil.")
}
metric := *v
if len(metric) != 1 {
t.Errorf("Expected one-dimensional metric.")
}
if metric["request_type"] != "your_mom" {
t.Errorf("Expected metric to match.")
}
result, err = p.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
})
if err != nil {
t.Error(err)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e = p.GetMetricForFingerprint(result[0])
if v == nil {
t.Fatal("Did not expect nil.")
}
metric = *v
if e != nil {
t.Error(e)
}
if len(metric) != 2 {
t.Errorf("Expected one-dimensional metric.")
}
if metric["request_type"] != "your_dad" {
t.Errorf("Expected metric to match.")
}
if metric["one-off"] != "value" {
t.Errorf("Expected metric to match.")
}
}

View file

@ -34,17 +34,17 @@ type MetricPersistence interface {
// Get all of the metric fingerprints that are associated with the provided
// label set.
GetFingerprintsForLabelSet(model.LabelSet) ([]model.Fingerprint, error)
GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error)
// Get all of the metric fingerprints that are associated for a given label
// name.
GetFingerprintsForLabelName(model.LabelName) ([]model.Fingerprint, error)
GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error)
GetMetricForFingerprint(model.Fingerprint) (model.Metric, error)
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
GetValueAtTime(model.Metric, time.Time, StalenessPolicy) (*model.Sample, error)
GetBoundaryValues(model.Metric, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error)
GetRangeValues(model.Metric, model.Interval, StalenessPolicy) (*model.SampleSet, error)
GetRangeValues(model.Metric, model.Interval) (*model.SampleSet, error)
GetAllMetricNames() ([]string, error)

View file

@ -0,0 +1,55 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/prometheus/prometheus/storage/metric"
"testing"
)
var testGetFingerprintsForLabelSet = buildTestPersistence("get_fingerprints_for_labelset", metric.GetFingerprintsForLabelSetTests)
func TestGetFingerprintsForLabelSet(t *testing.T) {
testGetFingerprintsForLabelSet(t)
}
func BenchmarkGetFingerprintsForLabelSet(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelSet(b)
}
}
var testGetFingerprintsForLabelName = buildTestPersistence("get_fingerprints_for_labelname", metric.GetFingerprintsForLabelNameTests)
func TestGetFingerprintsForLabelName(t *testing.T) {
testGetFingerprintsForLabelName(t)
}
func BenchmarkGetFingerprintsForLabelName(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelName(b)
}
}
var testGetMetricForFingerprint = buildTestPersistence("get_metric_for_fingerprint", metric.GetMetricForFingerprintTests)
func TestGetMetricForFingerprint(t *testing.T) {
testGetMetricForFingerprint(t)
}
func BenchmarkGetMetricForFingerprint(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetMetricForFingerprint(b)
}
}

View file

@ -1,932 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/utility/test"
"io/ioutil"
"math"
"math/rand"
"os"
"testing"
"testing/quick"
"time"
)
const (
stochasticMaximumVariance = 8
)
var testBasicLifecycle func(t test.Tester) = func(t test.Tester) {
temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test")
if temporaryDirectoryErr != nil {
t.Errorf("Could not create test directory: %q\n", temporaryDirectoryErr)
return
}
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, openErr := NewLevelDBMetricPersistence(temporaryDirectory)
if openErr != nil {
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", openErr)
}
if persistence == nil {
t.Errorf("Received nil LevelDB Metric Persistence.\n")
return
}
closeErr := persistence.Close()
if closeErr != nil {
t.Errorf("Could not close LevelDB Metric Persistence: %q\n", closeErr)
}
}
func TestBasicLifecycle(t *testing.T) {
testBasicLifecycle(t)
}
func BenchmarkBasicLifecycle(b *testing.B) {
for i := 0; i < b.N; i++ {
testBasicLifecycle(b)
}
}
var testReadEmpty func(t test.Tester) = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
hasLabelPair := func(x int) bool {
name := string(x)
value := string(x)
dto := &dto.LabelPair{
Name: proto.String(name),
Value: proto.String(value),
}
has, hasErr := persistence.HasLabelPair(dto)
if hasErr != nil {
return false
}
return has == false
}
if hasPairErr := quick.Check(hasLabelPair, nil); hasPairErr != nil {
t.Error(hasPairErr)
}
hasLabelName := func(x int) bool {
name := string(x)
dto := &dto.LabelName{
Name: proto.String(name),
}
has, hasErr := persistence.HasLabelName(dto)
if hasErr != nil {
return false
}
return has == false
}
if hasNameErr := quick.Check(hasLabelName, nil); hasNameErr != nil {
t.Error(hasNameErr)
}
getLabelPairFingerprints := func(x int) bool {
name := string(x)
value := string(x)
dto := &dto.LabelPair{
Name: proto.String(name),
Value: proto.String(value),
}
fingerprints, fingerprintsErr := persistence.getFingerprintsForLabelSet(dto)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
return len(fingerprints.Member) == 0
}
if labelPairFingerprintsErr := quick.Check(getLabelPairFingerprints, nil); labelPairFingerprintsErr != nil {
t.Error(labelPairFingerprintsErr)
}
getLabelNameFingerprints := func(x int) bool {
name := string(x)
dto := &dto.LabelName{
Name: proto.String(name),
}
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(dto)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
return len(fingerprints.Member) == 0
}
if labelNameFingerprintsErr := quick.Check(getLabelNameFingerprints, nil); labelNameFingerprintsErr != nil {
t.Error(labelNameFingerprintsErr)
}
}
func TestReadEmpty(t *testing.T) {
testReadEmpty(t)
}
func BenchmarkReadEmpty(b *testing.B) {
for i := 0; i < b.N; i++ {
testReadEmpty(b)
}
}
var testAppendSampleAsPureSparseAppend = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
v := model.SampleValue(x)
t := time.Unix(int64(x), int64(x))
l := model.Metric{model.LabelName(x): model.LabelValue(x)}
sample := model.Sample{
Value: v,
Timestamp: t,
Metric: l,
}
appendErr := persistence.AppendSample(sample)
return appendErr == nil
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestAppendSampleAsPureSparseAppend(t *testing.T) {
testAppendSampleAsPureSparseAppend(t)
}
func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSparseAppend(b)
}
}
var testAppendSampleAsSparseAppendWithReads func(t test.Tester) = func(t test.Tester) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
v := model.SampleValue(x)
t := time.Unix(int64(x), int64(x))
l := model.Metric{model.LabelName(x): model.LabelValue(x)}
sample := model.Sample{
Value: v,
Timestamp: t,
Metric: l,
}
appendErr := persistence.AppendSample(sample)
if appendErr != nil {
return false
}
labelNameDTO := &dto.LabelName{
Name: proto.String(string(x)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDTO)
if hasLabelNameErr != nil {
return false
}
if !hasLabelName {
return false
}
labelPairDTO := &dto.LabelPair{
Name: proto.String(string(x)),
Value: proto.String(string(x)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDTO)
if hasLabelPairErr != nil {
return false
}
if !hasLabelPair {
return false
}
labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDTO)
if labelNameFingerprintsErr != nil {
return false
}
if labelNameFingerprints == nil {
return false
}
if len(labelNameFingerprints.Member) != 1 {
return false
}
labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPairDTO)
if labelPairFingerprintsErr != nil {
return false
}
if labelPairFingerprints == nil {
return false
}
if len(labelPairFingerprints.Member) != 1 {
return false
}
return true
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
testAppendSampleAsSparseAppendWithReads(t)
}
func BenchmarkAppendSampleAsSparseAppendWithReads(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsSparseAppendWithReads(b)
}
}
func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
sample := model.Sample{
Value: model.SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), 0),
Metric: model.Metric{"name": "my_metric"},
}
appendErr := persistence.AppendSample(sample)
return appendErr == nil
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestStochastic(t *testing.T) {
stochastic := func(x int) bool {
s := time.Now()
seed := rand.NewSource(int64(x))
random := rand.New(seed)
numberOfMetrics := random.Intn(stochasticMaximumVariance) + 1
numberOfSharedLabels := random.Intn(stochasticMaximumVariance)
numberOfUnsharedLabels := random.Intn(stochasticMaximumVariance)
numberOfSamples := random.Intn(stochasticMaximumVariance) + 2
numberOfRangeScans := random.Intn(stochasticMaximumVariance)
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
metricTimestamps := make(map[int]map[int64]bool)
metricEarliestSample := make(map[int]int64)
metricNewestSample := make(map[int]int64)
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
sample := model.Sample{
Metric: model.Metric{},
}
v := model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
sample.Metric["name"] = v
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
l := model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex))
sample.Metric[l] = v
}
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex))
sample.Metric[l] = v
}
timestamps := make(map[int64]bool)
metricTimestamps[metricIndex] = timestamps
var newestSample int64 = math.MinInt64
var oldestSample int64 = math.MaxInt64
var nextTimestamp func() int64
nextTimestamp = func() int64 {
var candidate int64
candidate = random.Int63n(math.MaxInt32 - 1)
if _, has := timestamps[candidate]; has {
candidate = nextTimestamp()
}
timestamps[candidate] = true
if candidate < oldestSample {
oldestSample = candidate
}
if candidate > newestSample {
newestSample = candidate
}
return candidate
}
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = time.Unix(nextTimestamp(), 0)
sample.Value = model.SampleValue(sampleIndex)
appendErr := persistence.AppendSample(sample)
if appendErr != nil {
return false
}
}
metricEarliestSample[metricIndex] = oldestSample
metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
labelPair := &dto.LabelPair{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair)
if hasLabelPairErr != nil {
return false
}
if hasLabelPair != true {
return false
}
labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName)
if hasLabelNameErr != nil {
return false
}
if hasLabelName != true {
return false
}
}
}
for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ {
labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)),
}
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
if len(fingerprints.Member) != numberOfMetrics {
return false
}
}
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelPair := &dto.LabelPair{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair)
if hasLabelPairErr != nil {
return false
}
if hasLabelPair != true {
return false
}
labelPairFingerprints, labelPairFingerprintsErr := persistence.getFingerprintsForLabelSet(labelPair)
if labelPairFingerprintsErr != nil {
return false
}
if labelPairFingerprints == nil {
return false
}
if len(labelPairFingerprints.Member) != 1 {
return false
}
labelName := &dto.LabelName{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName)
if hasLabelNameErr != nil {
return false
}
if hasLabelName != true {
return false
}
labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
if labelNameFingerprintsErr != nil {
return false
}
if labelNameFingerprints == nil {
return false
}
if len(labelNameFingerprints.Member) != 1 {
return false
}
}
metric := make(model.Metric)
metric["name"] = model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
for i := 0; i < numberOfSharedLabels; i++ {
l := model.LabelName(fmt.Sprintf("shared_label_%d", i))
v := model.LabelValue(fmt.Sprintf("label_%d", i))
metric[l] = v
}
for i := 0; i < numberOfUnsharedLabels; i++ {
l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i))
v := model.LabelValue(fmt.Sprintf("private_label_%d", i))
metric[l] = v
}
for i := 0; i < numberOfRangeScans; i++ {
timestamps := metricTimestamps[metricIndex]
var first int64 = 0
var second int64 = 0
for {
firstCandidate := random.Int63n(int64(len(timestamps)))
secondCandidate := random.Int63n(int64(len(timestamps)))
smallest := int64(-1)
largest := int64(-1)
if firstCandidate == secondCandidate {
continue
} else if firstCandidate > secondCandidate {
largest = firstCandidate
smallest = secondCandidate
} else {
largest = secondCandidate
smallest = firstCandidate
}
j := int64(0)
for i := range timestamps {
if j == smallest {
first = i
} else if j == largest {
second = i
break
}
j++
}
break
}
begin := first
end := second
if second < first {
begin, end = second, first
}
interval := model.Interval{
OldestInclusive: time.Unix(begin, 0),
NewestInclusive: time.Unix(end, 0),
}
rangeValues, rangeErr := persistence.GetSamplesForMetric(metric, interval)
if rangeErr != nil {
return false
}
if len(rangeValues) < 2 {
return false
}
}
}
fmt.Printf("Duration %q\n", time.Now().Sub(s))
return true
}
if stochasticError := quick.Check(stochastic, nil); stochasticError != nil {
t.Error(stochasticError)
}
}
func TestGetFingerprintsForLabelSet(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_mom",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_dad",
},
})
if appendErr != nil {
t.Error(appendErr)
}
result, getErr := persistence.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("name"): model.LabelValue("my_metric"),
})
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
})
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
})
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}
func TestGetFingerprintsForLabelName(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_mom",
"language": "english",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Metric: model.Metric{
"name": "my_metric",
"request_type": "your_dad",
"sprache": "deutsch",
},
})
if appendErr != nil {
t.Error(appendErr)
}
b := model.LabelName("name")
result, getErr := persistence.GetFingerprintsForLabelName(b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("request_type")
result, getErr = persistence.GetFingerprintsForLabelName(b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 2 {
t.Errorf("Expected two elements.")
}
b = model.LabelName("language")
result, getErr = persistence.GetFingerprintsForLabelName(b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
b = model.LabelName("sprache")
result, getErr = persistence.GetFingerprintsForLabelName(b)
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
}
func TestGetMetricForFingerprint(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendErr := persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(0, 0),
Metric: model.Metric{
"request_type": "your_mom",
},
})
if appendErr != nil {
t.Error(appendErr)
}
appendErr = persistence.AppendSample(model.Sample{
Value: model.SampleValue(0),
Timestamp: time.Unix(int64(0), 0),
Metric: model.Metric{
"request_type": "your_dad",
"one-off": "value",
},
})
if appendErr != nil {
t.Error(appendErr)
}
result, getErr := persistence.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_mom"),
})
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e := persistence.GetMetricForFingerprint(result[0])
if e != nil {
t.Error(e)
}
if len(v) != 1 {
t.Errorf("Expected one-dimensional metric.")
}
if v["request_type"] != "your_mom" {
t.Errorf("Expected metric to match.")
}
result, getErr = persistence.GetFingerprintsForLabelSet(model.LabelSet{
model.LabelName("request_type"): model.LabelValue("your_dad"),
})
if getErr != nil {
t.Error(getErr)
}
if len(result) != 1 {
t.Errorf("Expected one element.")
}
v, e = persistence.GetMetricForFingerprint(result[0])
if e != nil {
t.Error(e)
}
if len(v) != 2 {
t.Errorf("Expected one-dimensional metric.")
}
if v["request_type"] != "your_dad" {
t.Errorf("Expected metric to match.")
}
if v["one-off"] != "value" {
t.Errorf("Expected metric to match.")
}
}

View file

@ -34,8 +34,6 @@ var (
type leveldbOpener func()
func (l *LevelDBMetricPersistence) Close() error {
log.Printf("Closing LevelDBPersistence storage containers...")
var persistences = []struct {
name string
closer io.Closer
@ -70,7 +68,6 @@ func (l *LevelDBMetricPersistence) Close() error {
go func(name string, closer io.Closer) {
if closer != nil {
log.Printf("Closing LevelDBPersistence storage container: %s\n", name)
closingError := closer.Close()
if closingError != nil {
@ -92,14 +89,10 @@ func (l *LevelDBMetricPersistence) Close() error {
}
}
log.Printf("Successfully closed all LevelDBPersistence storage containers.")
return nil
}
func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {
log.Printf("Opening LevelDBPersistence storage containers...")
errorChannel := make(chan error, 5)
emission := &LevelDBMetricPersistence{}
@ -151,11 +144,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
}
for _, subsystem := range subsystemOpeners {
name := subsystem.name
opener := subsystem.opener
log.Printf("Opening LevelDBPersistence storage container: %s\n", name)
go opener()
}
@ -168,9 +157,6 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
return
}
}
log.Printf("Successfully opened all LevelDBPersistence storage containers.\n")
persistence = emission
return

View file

@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
"sort"
"time"
)
@ -62,18 +63,14 @@ func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool {
type sampleKeyPredicate func(k *dto.SampleKey) bool
func keyIsOlderThan(t time.Time) sampleKeyPredicate {
unix := t.Unix()
return func(k *dto.SampleKey) bool {
return indexable.DecodeTime(k.Timestamp).Unix() > unix
return indexable.DecodeTime(k.Timestamp).After(t)
}
}
func keyIsAtMostOld(t time.Time) sampleKeyPredicate {
unix := t.Unix()
return func(k *dto.SampleKey) bool {
return indexable.DecodeTime(k.Timestamp).Unix() <= unix
return !indexable.DecodeTime(k.Timestamp).After(t)
}
}
@ -180,7 +177,7 @@ func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (c
return
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fps []model.Fingerprint, err error) {
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fps model.Fingerprints, err error) {
begin := time.Now()
defer func() {
@ -230,7 +227,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
return
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.LabelName) (fps []model.Fingerprint, err error) {
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.LabelName) (fps model.Fingerprints, err error) {
begin := time.Now()
defer func() {
@ -259,7 +256,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
return
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m model.Metric, err error) {
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
begin := time.Now()
defer func() {
@ -279,12 +276,16 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
return
}
m = model.Metric{}
metric := model.Metric{}
for _, v := range unmarshaled.LabelPair {
m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
metric[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
}
// Explicit address passing here shaves immense amounts of time off of the
// code flow due to less tight-loop dereferencing.
m = &metric
return
}
@ -551,7 +552,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s
return
}
func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interval, s metric.StalenessPolicy) (v *model.SampleSet, err error) {
func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interval) (v *model.SampleSet, err error) {
begin := time.Now()
defer func() {
@ -612,6 +613,12 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interv
})
}
// XXX: We should not explicitly sort here but rather rely on the datastore.
// This adds appreciable overhead.
if v != nil {
sort.Sort(v.Values)
}
return
}
@ -623,7 +630,10 @@ func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error
if err != nil {
return
}
return unmarshaled, nil
out = unmarshaled
return
}
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {

View file

@ -14,63 +14,18 @@
package leveldb
import (
"github.com/prometheus/prometheus/model"
"io/ioutil"
"os"
"github.com/prometheus/prometheus/storage/metric"
"testing"
"time"
)
func TestGetFingerprintsForLabelSetUsesAnd(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "test_get_fingerprints_for_label_set_uses_and")
var testGetFingerprintsForLabelSetUsesAndForLabelMatching = buildTestPersistence("get_fingerprints_for_labelset_uses_and_for_label_matching", metric.GetFingerprintsForLabelSetUsesAndForLabelMatchingTests)
defer func() {
err := os.RemoveAll(temporaryDirectory)
if err != nil {
t.Errorf("could not remove temporary directory: %f", err)
}
}()
func TestGetFingerprintsForLabelSetUsesAndForLabelMatching(t *testing.T) {
testGetFingerprintsForLabelSetUsesAndForLabelMatching(t)
}
persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory)
defer persistence.Close()
metrics := []map[string]string{
{"name": "request_metrics_latency_equal_tallying_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_equal_accumulating_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_logarithmic_accumulating_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_logarithmic_tallying_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "targets_healthy_scrape_latency_ms", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
}
for _, metric := range metrics {
m := model.Metric{}
for k, v := range metric {
m[model.LabelName(k)] = model.LabelValue(v)
}
err := persistence.AppendSample(model.Sample{
Value: model.SampleValue(0.0),
Timestamp: time.Now(),
Metric: m,
})
if err != nil {
t.Errorf("could not create base sample: %s", err)
}
}
labelSet := model.LabelSet{
"name": "targets_healthy_scrape_latency_ms",
"percentile": "0.010000",
}
fingerprints, err := persistence.GetFingerprintsForLabelSet(labelSet)
if err != nil {
t.Errorf("could not get labels: %s", err)
}
if len(fingerprints) != 1 {
t.Errorf("did not get a single metric as is expected, got %s", fingerprints)
func BenchmarkGetFingerprintsForLabelSetUsesAndForLabelMatching(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelSetUsesAndForLabelMatching(b)
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,109 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
"io/ioutil"
"testing"
)
var testBasicLifecycle = buildTestPersistence("basic_lifecycle", metric.BasicLifecycleTests)
func TestBasicLifecycle(t *testing.T) {
testBasicLifecycle(t)
}
func BenchmarkBasicLifecycle(b *testing.B) {
for i := 0; i < b.N; i++ {
testBasicLifecycle(b)
}
}
var testReadEmpty = buildTestPersistence("read_empty", metric.ReadEmptyTests)
func TestReadEmpty(t *testing.T) {
testReadEmpty(t)
}
func BenchmarkReadEmpty(b *testing.B) {
for i := 0; i < b.N; i++ {
testReadEmpty(b)
}
}
var testAppendSampleAsPureSparseAppend = buildTestPersistence("append_sample_as_pure_sparse_append", metric.AppendSampleAsPureSparseAppendTests)
func TestAppendSampleAsPureSparseAppend(t *testing.T) {
testAppendSampleAsPureSparseAppend(t)
}
func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSparseAppend(b)
}
}
var testAppendSampleAsSparseAppendWithReads = buildTestPersistence("append_sample_as_sparse_append_with_reads", metric.AppendSampleAsSparseAppendWithReadsTests)
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
testAppendSampleAsSparseAppendWithReads(t)
}
func BenchmarkAppendSampleAsSparseAppendWithReads(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsSparseAppendWithReads(b)
}
}
var testAppendSampleAsPureSingleEntityAppend = buildTestPersistence("append_sample_as_pure_single_entity_append", metric.AppendSampleAsPureSingleEntityAppendTests)
func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
testAppendSampleAsPureSingleEntityAppend(t)
}
func BenchmarkAppendSampleAsPureSingleEntityAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSingleEntityAppend(b)
}
}
func testStochastic(t test.Tester) {
persistenceMaker := func() metric.MetricPersistence {
temporaryDirectory, err := ioutil.TempDir("", "test_leveldb_stochastic")
if err != nil {
t.Errorf("Could not create test directory: %q\n", err)
}
p, err := NewLevelDBMetricPersistence(temporaryDirectory)
if err != nil {
t.Errorf("Could not start up LevelDB: %q\n", err)
}
return p
}
metric.StochasticTests(persistenceMaker, t)
}
func TestStochastic(t *testing.T) {
testStochastic(t)
}
func BenchmarkStochastic(b *testing.B) {
for i := 0; i < b.N; i++ {
testStochastic(b)
}
}

View file

@ -0,0 +1,84 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"fmt"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
"io"
"io/ioutil"
"os"
)
type purger struct {
path string
}
func (p purger) Close() error {
return os.RemoveAll(p.path)
}
func buildTestPersistencesMaker(name string, t test.Tester) func() (metric.MetricPersistence, io.Closer) {
return func() (metric.MetricPersistence, io.Closer) {
temporaryDirectory, err := ioutil.TempDir("", "get_value_at_time")
if err != nil {
t.Errorf("Could not create test directory: %q\n", err)
}
p, err := NewLevelDBMetricPersistence(temporaryDirectory)
if err != nil {
t.Errorf("Could not start up LevelDB: %q\n", err)
}
purger := purger{
path: temporaryDirectory,
}
return p, purger
}
}
func buildTestPersistence(name string, f func(p metric.MetricPersistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
temporaryDirectory, err := ioutil.TempDir("", fmt.Sprintf("test_leveldb_%s", name))
if err != nil {
t.Errorf("Could not create test directory: %q\n", err)
return
}
defer func() {
err := os.RemoveAll(temporaryDirectory)
if err != nil {
t.Errorf("Could not remove temporary directory: %q\n", err)
}
}()
p, err := NewLevelDBMetricPersistence(temporaryDirectory)
if err != nil {
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
}
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
f(p, t)
}
}

View file

@ -0,0 +1,55 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"github.com/prometheus/prometheus/storage/metric"
"testing"
)
var testGetFingerprintsForLabelSet = buildTestPersistence(metric.GetFingerprintsForLabelSetTests)
func TestGetFingerprintsForLabelSet(t *testing.T) {
testGetFingerprintsForLabelSet(t)
}
func BenchmarkGetFingerprintsForLabelSet(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelSet(b)
}
}
var testGetFingerprintsForLabelName = buildTestPersistence(metric.GetFingerprintsForLabelNameTests)
func TestGetFingerprintsForLabelName(t *testing.T) {
testGetFingerprintsForLabelName(t)
}
func BenchmarkGetFingerprintsForLabelName(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelName(b)
}
}
var testGetMetricForFingerprint = buildTestPersistence(metric.GetMetricForFingerprintTests)
func TestGetMetricForFingerprint(t *testing.T) {
testGetMetricForFingerprint(t)
}
func BenchmarkGetMetricForFingerprint(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetMetricForFingerprint(b)
}
}

View file

@ -0,0 +1,23 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"github.com/prometheus/prometheus/storage/metric"
"testing"
)
func TestInterfaceAdherence(t *testing.T) {
var _ metric.MetricPersistence = NewMemorySeriesStorage()
}

View file

@ -0,0 +1,303 @@
package memory
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
"github.com/ryszard/goskiplist/skiplist"
"sort"
"time"
)
const (
reservedDelimiter = `"`
)
type skipListTime time.Time
func (t skipListTime) LessThan(o skiplist.Ordered) bool {
// return time.Time(t).Before(time.Time(o.(skipListTime)))
return time.Time(o.(skipListTime)).Before(time.Time(t))
}
type stream struct {
metric model.Metric
values *skiplist.SkipList
}
func (s *stream) add(sample model.Sample) {
s.values.Set(skipListTime(sample.Timestamp), sample.Value)
}
func newStream(metric model.Metric) *stream {
return &stream{
values: skiplist.New(),
metric: metric,
}
}
type memorySeriesStorage struct {
fingerprintToSeries map[model.Fingerprint]*stream
labelPairToFingerprints map[string]model.Fingerprints
labelNameToFingerprints map[model.LabelName]model.Fingerprints
}
func (s *memorySeriesStorage) AppendSample(sample model.Sample) (err error) {
metric := sample.Metric
fingerprint := metric.Fingerprint()
series, ok := s.fingerprintToSeries[fingerprint]
if !ok {
series = newStream(metric)
s.fingerprintToSeries[fingerprint] = series
for k, v := range metric {
labelPair := fmt.Sprintf("%s%s%s", k, reservedDelimiter, v)
labelPairValues := s.labelPairToFingerprints[labelPair]
labelPairValues = append(labelPairValues, fingerprint)
s.labelPairToFingerprints[labelPair] = labelPairValues
labelNameValues := s.labelNameToFingerprints[k]
labelNameValues = append(labelNameValues, fingerprint)
s.labelNameToFingerprints[k] = labelNameValues
}
}
series.add(sample)
return
}
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) {
sets := []utility.Set{}
for k, v := range l {
signature := fmt.Sprintf("%s%s%s", k, reservedDelimiter, v)
values := s.labelPairToFingerprints[signature]
set := utility.Set{}
for _, fingerprint := range values {
set.Add(fingerprint)
}
sets = append(sets, set)
}
setCount := len(sets)
if setCount == 0 {
return
}
base := sets[0]
for i := 1; i < setCount; i++ {
base = base.Intersection(sets[i])
}
for _, e := range base.Elements() {
fingerprint := e.(model.Fingerprint)
fingerprints = append(fingerprints, fingerprint)
}
return
}
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fingerprints model.Fingerprints, err error) {
values := s.labelNameToFingerprints[l]
fingerprints = append(fingerprints, values...)
return
}
func (s *memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metric *model.Metric, err error) {
series, ok := s.fingerprintToSeries[f]
if !ok {
return
}
metric = &series.metric
return
}
// XXX: Terrible wart.
func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) model.SampleValue {
yDelta := y2 - y1
xDelta := x2.Sub(x1)
dDt := yDelta / float32(xDelta)
offset := float32(e.Sub(x1))
return model.SampleValue(y1 + (offset * dDt))
}
func (s *memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p metric.StalenessPolicy) (sample *model.Sample, err error) {
fingerprint := m.Fingerprint()
series, ok := s.fingerprintToSeries[fingerprint]
if !ok {
return
}
iterator := series.values.Seek(skipListTime(t))
if iterator == nil {
return
}
foundTime := time.Time(iterator.Key().(skipListTime))
if foundTime.Equal(t) {
sample = &model.Sample{
Metric: m,
Value: iterator.Value().(model.SampleValue),
Timestamp: t,
}
return
}
if t.Sub(foundTime) > p.DeltaAllowance {
return
}
secondTime := foundTime
secondValue := iterator.Value().(model.SampleValue)
if !iterator.Previous() {
sample = &model.Sample{
Metric: m,
Value: iterator.Value().(model.SampleValue),
Timestamp: t,
}
return
}
firstTime := time.Time(iterator.Key().(skipListTime))
if t.Sub(firstTime) > p.DeltaAllowance {
return
}
if firstTime.Sub(secondTime) > p.DeltaAllowance {
return
}
firstValue := iterator.Value().(model.SampleValue)
sample = &model.Sample{
Metric: m,
Value: interpolate(firstTime, secondTime, float32(firstValue), float32(secondValue), t),
Timestamp: t,
}
return
}
func (s *memorySeriesStorage) GetBoundaryValues(m model.Metric, i model.Interval, p metric.StalenessPolicy) (first *model.Sample, second *model.Sample, err error) {
first, err = s.GetValueAtTime(m, i.OldestInclusive, p)
if err != nil {
return
} else if first == nil {
return
}
second, err = s.GetValueAtTime(m, i.NewestInclusive, p)
if err != nil {
return
} else if second == nil {
first = nil
}
return
}
func (s *memorySeriesStorage) GetRangeValues(m model.Metric, i model.Interval) (samples *model.SampleSet, err error) {
fingerprint := m.Fingerprint()
series, ok := s.fingerprintToSeries[fingerprint]
if !ok {
return
}
samples = &model.SampleSet{
Metric: m,
}
iterator := series.values.Seek(skipListTime(i.NewestInclusive))
if iterator == nil {
return
}
for {
timestamp := time.Time(iterator.Key().(skipListTime))
if timestamp.Before(i.OldestInclusive) {
break
}
samples.Values = append(samples.Values,
model.SamplePair{
Value: model.SampleValue(iterator.Value().(model.SampleValue)),
Timestamp: timestamp,
})
if !iterator.Next() {
break
}
}
// XXX: We should not explicitly sort here but rather rely on the datastore.
// This adds appreciable overhead.
if samples != nil {
sort.Sort(samples.Values)
}
return
}
func (s *memorySeriesStorage) Close() (err error) {
for fingerprint := range s.fingerprintToSeries {
delete(s.fingerprintToSeries, fingerprint)
}
for labelPair := range s.labelPairToFingerprints {
delete(s.labelPairToFingerprints, labelPair)
}
for labelName := range s.labelNameToFingerprints {
delete(s.labelNameToFingerprints, labelName)
}
return
}
func (s *memorySeriesStorage) GetAllMetricNames() (names []string, err error) {
panic("not implemented")
return
}
func (s *memorySeriesStorage) GetAllLabelNames() (names []string, err error) {
panic("not implemented")
return
}
func (s *memorySeriesStorage) GetAllLabelPairs() (pairs []model.LabelSet, err error) {
panic("not implemented")
return
}
func (s *memorySeriesStorage) GetAllMetrics() (metrics []model.LabelSet, err error) {
panic("not implemented")
return
}
func NewMemorySeriesStorage() metric.MetricPersistence {
return newMemorySeriesStorage()
}
func newMemorySeriesStorage() *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[model.Fingerprint]*stream),
labelPairToFingerprints: make(map[string]model.Fingerprints),
labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints),
}
}

View file

@ -0,0 +1,31 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"github.com/prometheus/prometheus/storage/metric"
"testing"
)
var testGetFingerprintsForLabelSetUsesAndForLabelMatching = buildTestPersistence(metric.GetFingerprintsForLabelSetUsesAndForLabelMatchingTests)
func TestGetFingerprintsForLabelSetUsesAndForLabelMatching(t *testing.T) {
testGetFingerprintsForLabelSetUsesAndForLabelMatching(t)
}
func BenchmarkGetFingerprintsForLabelSetUsesAndLabelMatching(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetFingerprintsForLabelSetUsesAndForLabelMatching(b)
}
}

View file

@ -0,0 +1,76 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
"io"
"io/ioutil"
"testing"
)
func testGetValueAtTime(t test.Tester) {
persistenceMaker := func() (metric.MetricPersistence, io.Closer) {
return NewMemorySeriesStorage(), ioutil.NopCloser(nil)
}
metric.GetValueAtTimeTests(persistenceMaker, t)
}
func TestGetValueAtTime(t *testing.T) {
testGetValueAtTime(t)
}
func BenchmarkGetValueAtTime(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetValueAtTime(b)
}
}
func testGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (metric.MetricPersistence, io.Closer) {
return NewMemorySeriesStorage(), ioutil.NopCloser(nil)
}
metric.GetBoundaryValuesTests(persistenceMaker, t)
}
func TestGetBoundaryValues(t *testing.T) {
testGetBoundaryValues(t)
}
func BenchmarkGetBoundaryValues(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetBoundaryValues(b)
}
}
func testGetRangeValues(t test.Tester) {
persistenceMaker := func() (metric.MetricPersistence, io.Closer) {
return NewMemorySeriesStorage(), ioutil.NopCloser(nil)
}
metric.GetRangeValuesTests(persistenceMaker, t)
}
func TestGetRangeValues(t *testing.T) {
testGetRangeValues(t)
}
func BenchmarkGetRangeValues(b *testing.B) {
for i := 0; i < b.N; i++ {
testGetRangeValues(b)
}
}

View file

@ -0,0 +1,114 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
"testing"
)
func buildTestPersistence(f func(p metric.MetricPersistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
p := NewMemorySeriesStorage()
defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
f(p, t)
}
}
var testBasicLifecycle = buildTestPersistence(metric.BasicLifecycleTests)
func TestBasicLifecycle(t *testing.T) {
testBasicLifecycle(t)
}
func BenchmarkBasicLifecycle(b *testing.B) {
for i := 0; i < b.N; i++ {
testBasicLifecycle(b)
}
}
var testReadEmpty = buildTestPersistence(metric.ReadEmptyTests)
func TestReadEmpty(t *testing.T) {
testReadEmpty(t)
}
func BenchmarkReadEmpty(b *testing.B) {
for i := 0; i < b.N; i++ {
testReadEmpty(b)
}
}
var testAppendSampleAsPureSparseAppend = buildTestPersistence(metric.AppendSampleAsPureSparseAppendTests)
func TestAppendSampleAsPureSparseAppend(t *testing.T) {
testAppendSampleAsPureSparseAppend(t)
}
func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSparseAppend(b)
}
}
var testAppendSampleAsSparseAppendWithReads = buildTestPersistence(metric.AppendSampleAsSparseAppendWithReadsTests)
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
testAppendSampleAsSparseAppendWithReads(t)
}
func BenchmarkAppendSampleAsSparseAppendWithReads(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsSparseAppendWithReads(b)
}
}
var testAppendSampleAsPureSingleEntityAppend = buildTestPersistence(metric.AppendSampleAsPureSingleEntityAppendTests)
func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
testAppendSampleAsPureSingleEntityAppend(t)
}
func BenchmarkAppendSampleAsPureSingleEntityAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
testAppendSampleAsPureSingleEntityAppend(b)
}
}
func testStochastic(t test.Tester) {
persistenceMaker := func() metric.MetricPersistence {
return NewMemorySeriesStorage()
}
metric.StochasticTests(persistenceMaker, t)
}
func TestStochastic(t *testing.T) {
testStochastic(t)
}
func BenchmarkStochastic(b *testing.B) {
for i := 0; i < b.N; i++ {
testStochastic(b)
}
}

View file

@ -0,0 +1,58 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"time"
)
func GetFingerprintsForLabelSetUsesAndForLabelMatchingTests(p MetricPersistence, t test.Tester) {
metrics := []map[string]string{
{"name": "request_metrics_latency_equal_tallying_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_equal_accumulating_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_logarithmic_accumulating_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "requests_metrics_latency_logarithmic_tallying_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{"name": "targets_healthy_scrape_latency_ms", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
}
for _, metric := range metrics {
m := model.Metric{}
for k, v := range metric {
m[model.LabelName(k)] = model.LabelValue(v)
}
appendSample(p, model.Sample{
Value: model.SampleValue(0.0),
Timestamp: time.Now(),
Metric: m,
}, t)
}
labelSet := model.LabelSet{
"name": "targets_healthy_scrape_latency_ms",
"percentile": "0.010000",
}
fingerprints, err := p.GetFingerprintsForLabelSet(labelSet)
if err != nil {
t.Errorf("could not get labels: %s", err)
}
if len(fingerprints) != 1 {
t.Errorf("did not get a single metric as is expected, got %s", fingerprints)
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,433 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"math"
"math/rand"
"testing/quick"
"time"
)
const (
stochasticMaximumVariance = 8
)
func BasicLifecycleTests(p MetricPersistence, t test.Tester) {
if p == nil {
t.Errorf("Received nil Metric Persistence.\n")
return
}
}
func ReadEmptyTests(p MetricPersistence, t test.Tester) {
hasLabelPair := func(x int) (success bool) {
name := model.LabelName(string(x))
value := model.LabelValue(string(x))
labelSet := model.LabelSet{
name: value,
}
fingerprints, err := p.GetFingerprintsForLabelSet(labelSet)
if err != nil {
t.Error(err)
return
}
success = len(fingerprints) == 0
if !success {
t.Errorf("unexpected fingerprint length %d, got %d", 0, len(fingerprints))
}
return
}
err := quick.Check(hasLabelPair, nil)
if err != nil {
t.Error(err)
return
}
hasLabelName := func(x int) (success bool) {
labelName := model.LabelName(string(x))
fingerprints, err := p.GetFingerprintsForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
success = len(fingerprints) == 0
if !success {
t.Errorf("unexpected fingerprint length %d, got %d", 0, len(fingerprints))
}
return
}
err = quick.Check(hasLabelName, nil)
if err != nil {
t.Error(err)
return
}
}
func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
appendSample := func(x int) (success bool) {
v := model.SampleValue(x)
ts := time.Unix(int64(x), int64(x))
labelName := model.LabelName(x)
labelValue := model.LabelValue(x)
l := model.Metric{labelName: labelValue}
sample := model.Sample{
Value: v,
Timestamp: ts,
Metric: l,
}
err := p.AppendSample(sample)
success = err == nil
if !success {
t.Error(err)
}
return
}
if err := quick.Check(appendSample, nil); err != nil {
t.Error(err)
}
}
func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester) {
appendSample := func(x int) (success bool) {
v := model.SampleValue(x)
ts := time.Unix(int64(x), int64(x))
labelName := model.LabelName(x)
labelValue := model.LabelValue(x)
l := model.Metric{labelName: labelValue}
sample := model.Sample{
Value: v,
Timestamp: ts,
Metric: l,
}
err := p.AppendSample(sample)
if err != nil {
t.Error(err)
return
}
fingerprints, err := p.GetFingerprintsForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) != 1 {
t.Errorf("expected fingerprint count of %d, got %d", 1, len(fingerprints))
return
}
fingerprints, err = p.GetFingerprintsForLabelSet(model.LabelSet{
labelName: labelValue,
})
if err != nil {
t.Error(err)
return
}
if len(fingerprints) != 1 {
t.Error("expected fingerprint count of %d, got %d", 1, len(fingerprints))
return
}
return true
}
if err := quick.Check(appendSample, nil); err != nil {
t.Error(err)
}
}
func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Tester) {
appendSample := func(x int) bool {
sample := model.Sample{
Value: model.SampleValue(x),
Timestamp: time.Unix(int64(x), 0),
Metric: model.Metric{"name": "my_metric"},
}
err := p.AppendSample(sample)
return err == nil
}
if err := quick.Check(appendSample, nil); err != nil {
t.Error(err)
}
}
func StochasticTests(persistenceMaker func() MetricPersistence, t test.Tester) {
stochastic := func(x int) (success bool) {
p := persistenceMaker()
defer func() {
err := p.Close()
if err != nil {
t.Error(err)
}
}()
seed := rand.NewSource(int64(x))
random := rand.New(seed)
numberOfMetrics := random.Intn(stochasticMaximumVariance) + 1
numberOfSharedLabels := random.Intn(stochasticMaximumVariance)
numberOfUnsharedLabels := random.Intn(stochasticMaximumVariance)
numberOfSamples := random.Intn(stochasticMaximumVariance) + 2
numberOfRangeScans := random.Intn(stochasticMaximumVariance)
metricTimestamps := map[int]map[int64]bool{}
metricEarliestSample := map[int]int64{}
metricNewestSample := map[int]int64{}
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
sample := model.Sample{
Metric: model.Metric{},
}
v := model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
sample.Metric["name"] = v
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
l := model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex))
sample.Metric[l] = v
}
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex))
v := model.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex))
sample.Metric[l] = v
}
timestamps := map[int64]bool{}
metricTimestamps[metricIndex] = timestamps
var (
newestSample int64 = math.MinInt64
oldestSample int64 = math.MaxInt64
nextTimestamp func() int64
)
nextTimestamp = func() int64 {
var candidate int64
candidate = random.Int63n(math.MaxInt32 - 1)
if _, has := timestamps[candidate]; has {
// WART
candidate = nextTimestamp()
}
timestamps[candidate] = true
if candidate < oldestSample {
oldestSample = candidate
}
if candidate > newestSample {
newestSample = candidate
}
return candidate
}
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = time.Unix(nextTimestamp(), 0)
sample.Value = model.SampleValue(sampleIndex)
err := p.AppendSample(sample)
if err != nil {
t.Error(err)
return
}
}
metricEarliestSample[metricIndex] = oldestSample
metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
labelPair := model.LabelSet{
model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex)): model.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex)),
}
fingerprints, err := p.GetFingerprintsForLabelSet(labelPair)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) == 0 {
t.Errorf("expected fingerprint count of %d, got %d", 0, len(fingerprints))
return
}
labelName := model.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex))
fingerprints, err = p.GetFingerprintsForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) == 0 {
t.Errorf("expected fingerprint count of %d, got %d", 0, len(fingerprints))
return
}
}
}
for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ {
labelName := model.LabelName(fmt.Sprintf("shared_label_%d", sharedIndex))
fingerprints, err := p.GetFingerprintsForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) != numberOfMetrics {
t.Errorf("expected fingerprint count of %d, got %d", numberOfMetrics, len(fingerprints))
return
}
}
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelName := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex))
labelValue := model.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex))
labelSet := model.LabelSet{
labelName: labelValue,
}
fingerprints, err := p.GetFingerprintsForLabelSet(labelSet)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) != 1 {
t.Errorf("expected fingerprint count of %d, got %d", 1, len(fingerprints))
return
}
fingerprints, err = p.GetFingerprintsForLabelName(labelName)
if err != nil {
t.Error(err)
return
}
if len(fingerprints) != 1 {
t.Errorf("expected fingerprint count of %d, got %d", 1, len(fingerprints))
return
}
}
metric := model.Metric{}
metric["name"] = model.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
for i := 0; i < numberOfSharedLabels; i++ {
l := model.LabelName(fmt.Sprintf("shared_label_%d", i))
v := model.LabelValue(fmt.Sprintf("label_%d", i))
metric[l] = v
}
for i := 0; i < numberOfUnsharedLabels; i++ {
l := model.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i))
v := model.LabelValue(fmt.Sprintf("private_label_%d", i))
metric[l] = v
}
for i := 0; i < numberOfRangeScans; i++ {
timestamps := metricTimestamps[metricIndex]
var first int64 = 0
var second int64 = 0
for {
firstCandidate := random.Int63n(int64(len(timestamps)))
secondCandidate := random.Int63n(int64(len(timestamps)))
smallest := int64(-1)
largest := int64(-1)
if firstCandidate == secondCandidate {
continue
} else if firstCandidate > secondCandidate {
largest = firstCandidate
smallest = secondCandidate
} else {
largest = secondCandidate
smallest = firstCandidate
}
j := int64(0)
for i := range timestamps {
if j == smallest {
first = i
} else if j == largest {
second = i
break
}
j++
}
break
}
begin := first
end := second
if second < first {
begin, end = second, first
}
interval := model.Interval{
OldestInclusive: time.Unix(begin, 0),
NewestInclusive: time.Unix(end, 0),
}
samples, err := p.GetRangeValues(metric, interval)
if err != nil {
t.Error(err)
return
}
if len(samples.Values) < 2 {
t.Errorf("expected sample count less than %d, got %d", 2, len(samples.Values))
return
}
}
}
return true
}
if err := quick.Check(stochastic, nil); err != nil {
t.Error(err)
}
}

View file

@ -0,0 +1,26 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
)
func appendSample(p MetricPersistence, s model.Sample, t test.Tester) {
err := p.AppendSample(s)
if err != nil {
t.Fatal(err)
}
}