Initial commit of external resources.

This commit is contained in:
Matt T. Proud 2012-11-24 12:33:34 +01:00
parent 734d28b515
commit 6072143505
24 changed files with 2841 additions and 2 deletions

23
Makefile Normal file
View file

@ -0,0 +1,23 @@
TEST_ARTIFACTS = prometheus
all: test
test: build
go test ./...
build:
$(MAKE) -C model
go build ./...
go build .
clean:
rm -rf $(TEST_ARTIFACTS)
$(MAKE) -C model clean
-find . -type f -iname '*~' -exec rm '{}' ';'
-find . -type f -iname '*#' -exec rm '{}' ';'
-find . -type f -iname '.*' -exec rm '{}' ';'
format:
find . -iname '*.go' | grep -v generated | xargs -n1 gofmt -w
.PHONY: build clean format test

View file

@ -1,4 +1,22 @@
prometheus Prometheus
========== ==========
Bedecke deinen Himmel, Zeus! Bedecke deinen Himmel, Zeus! A new kid is in town.
Prerequisites
=============
1. Go 1.0.X.
2. LevelDB: (https://code.google.com/p/leveldb/).
3. Protocol Buffers Compiler: (http://code.google.com/p/protobuf/).
4. goprotobuf: the code generator and runtime library: (http://code.google.com/p/goprotobuf/).
5. Levigo, a Go-wrapper around LevelDB's C library: (https://github.com/jmhodges/levigo).
Initial Hurdles
===============
1. A bit of this grew organically without an easy way of binding it all together. The tests will pass but slowly. They were not optimized for speed but end-to-end coverage of the whole storage model. This is something immediate to fix.
2. Protocol Buffer generator for Go changed emitted output API. This will need to be fixed before other contributors can participate.
Milestones
==========
1. In-memory archive, basic rule language, simple computation engine, and naive exposition system.

26
coding/indexable/time.go Normal file
View file

@ -0,0 +1,26 @@
package indexable
import (
"encoding/binary"
"time"
)
var (
EarliestTime = EncodeTime(time.Unix(0, 0))
)
func EncodeTimeInto(dst []byte, t time.Time) {
binary.BigEndian.PutUint64(dst, uint64(t.Unix()))
}
func EncodeTime(t time.Time) []byte {
buffer := make([]byte, 8)
EncodeTimeInto(buffer, t)
return buffer
}
func DecodeTime(src []byte) time.Time {
return time.Unix(int64(binary.BigEndian.Uint64(src)), 0)
}

View file

@ -0,0 +1,25 @@
package indexable
import (
"math/rand"
"testing"
"testing/quick"
"time"
)
func TestTimeEndToEnd(t *testing.T) {
tester := func(x int) bool {
random := rand.New(rand.NewSource(int64(x)))
buffer := make([]byte, 8)
incoming := time.Unix(random.Int63(), 0)
EncodeTimeInto(buffer, incoming)
outgoing := DecodeTime(buffer)
return incoming.Equal(outgoing) && incoming.Unix() == outgoing.Unix()
}
if err := quick.Check(tester, nil); err != nil {
t.Error(err)
}
}

View file

@ -0,0 +1 @@
mtp@Matt.local.225

View file

@ -0,0 +1,61 @@
// {
// set evaluation_interval = "30s";
// target "http://www.example.com:80/metrics"
// rule archived {name="instance:requests_total:sum"} = sum by (instance) {name="requests"};
// rule archived {name="instance:requests-by_result_code_total:sum"} =
// sum by (instance,result_code) {name="requests"};
// rule archived {name="instance:requests-by_result_code:sum"} =
// {name="instances:requests-by_result_code"}
// / by (instance)
// {name="instances:requests_total:sum"};
// }
{
set evaluation_interval = “2m”;
permanent {
rule {
labels {
set name = “process:request_rate_qps-allowed:sum”;
set job = “frontend”;
}
// Values may be a literal or an expression.
set value = 500;
// I wonder: Is it practical to express labels similar to above in a better DSL?
// set value = EXPRESSION … WITH LABELS {foo=”bar”};
}
}
rule {
// This provides a way of overriding existing labels, unsetting them, or
// appending new ones.
labels {
// “name” is obligatory.
set name = “process:requests_total:sum”;
}
// Here we are extracting a metric with the name label value of “requests” from
// job “frontend” and merely accumulating this into a named variable. It is
// similar to standing. Each sum is keyed to the UNIX process and job from which
// it came.
set value = SUM({name=”requests”, job=”frontend”}) BY (process, job);
}
rule {
// This provides a way of overriding existing labels, unsetting them, or
// appending new ones.
labels {
// “name” is obligatory.
set name = “process:request_qps:rate5m”;
}
// Here we are extracting a metric with the name label value of “requests” from
// job “frontend” and merely accumulating this into a named variable. It is
// similar to standing.
set value = RATE({name=”process:requests_total:sum”, job=”frontend”} OVER “5m”);
}
}
}

View file

@ -0,0 +1,506 @@
package main
import (
"fmt"
"strings"
"unicode"
"unicode/utf8"
)
type itemType int
const (
itemError itemType = iota
itemEof
itemRuleContextOpen
itemRuleContextClose
itemSetKeyword
itemKey
itemEqual
itemValue
itemSemicolon
itemQuote
itemRulesKeyword
itemRuleKeyword
itemRuleName
itemRuleValue
itemOpenBracket
itemCloseBracket
)
const (
literalCloseBracket = "}"
literalEof = -1
literalOpenBracket = "{"
literalRule = "rule"
literalRules = "rules"
literalSet = "set"
literalEqual = "="
literalQuote = `"`
literalSemicolon = ";"
)
type element struct {
itemType itemType
value string
}
func (e element) String() string {
switch e.itemType {
case itemError:
return e.value
case itemEof:
return "EOF"
}
return fmt.Sprintf("%s %q", e.itemType, e.value)
}
type lexer struct {
input string
elements chan element
start int
position int
runeWidth int
}
func lex(name, input string) (*lexer, chan element) {
l := &lexer{
input: input,
elements: make(chan element),
}
go l.run()
return l, l.elements
}
func (l *lexer) run() {
for state := lexBody; state != nil; {
state = state(l)
}
close(l.elements)
}
func (l *lexer) next() (rune rune) {
if l.position >= len(l.input) {
l.runeWidth = 0
return literalEof
}
rune, l.runeWidth = utf8.DecodeRuneInString(l.input[l.position:])
l.position += l.runeWidth
return rune
}
func lexBody(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalOpenBracket) {
return lexRulesetOpen
}
switch rune := l.next(); {
case rune == literalEof:
l.emit(itemEof)
return nil
case isSpace(rune):
l.ignore()
default:
return l.errorf("illegal input")
}
return lexBody
}
func lexRulesetOpen(l *lexer) lexFunction {
l.position += len(literalOpenBracket)
l.emit(itemRuleContextOpen)
return lexRulesetInside
}
func lexRulesetInside(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalCloseBracket) {
return lexRulesetClose
}
if strings.HasPrefix(l.input[l.position:], literalSet) {
return lexRuleSetKeyword
}
if strings.HasPrefix(l.input[l.position:], literalRules) {
return lexRulesetRules
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("unterminated ruleset")
case isSpace(rune):
l.ignore()
case rune == ';':
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetInside
}
func lexRulesetRules(l *lexer) lexFunction {
l.position += len(literalRules)
l.emit(itemRulesKeyword)
return lexRulesetRulesBlockOpen
}
func lexRulesetRulesBlockOpen(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalOpenBracket) {
l.position += len(literalOpenBracket)
l.emit(itemOpenBracket)
return lexRulesetRulesBlockInside
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRulesBlockOpen
}
func lexRulesetRulesBlockInside(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalRule) {
return lexRulesetRuleBegin
}
if strings.HasPrefix(l.input[l.position:], literalCloseBracket) {
return lexRulesetRulesBlockClose
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRulesBlockInside
}
func lexRulesetRulesBlockClose(l *lexer) lexFunction {
l.position += len(literalCloseBracket)
l.emit(itemCloseBracket)
return lexRulesetInside
}
func lexRulesetRuleBegin(l *lexer) lexFunction {
l.position += len(literalRule)
l.emit(itemRuleKeyword)
return lexRulesetRuleName
}
func lexRulesetRuleName(l *lexer) lexFunction {
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
case isIdentifierOpen(rune):
for {
switch rune := l.next(); {
case isMetricIdentifier(rune):
case rune == '=':
l.backup()
l.emit(itemRuleName)
return lexRulesetRuleEqual
default:
return l.errorf("bad rule name")
}
}
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleName
}
func lexRulesetRuleEqual(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalEqual) {
l.position += len(literalEqual)
l.emit(itemEqual)
return lexRulesetRuleDefinitionBegin
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleEqual
}
func lexRulesetRuleDefinitionBegin(l *lexer) lexFunction {
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
case isIdentifierOpen(rune):
for {
switch rune := l.next(); {
case isMetricIdentifier(rune):
case rune == ';':
l.emit(itemRuleValue)
return lexRulesetRulesBlockInside
default:
return l.errorf("unrecognized input")
}
}
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleDefinitionBegin
}
func lexRuleSetKeyword(l *lexer) lexFunction {
l.position += len(literalSet)
l.emit(itemSetKeyword)
return lexRuleSetInside
}
func (l *lexer) backup() {
l.position -= l.runeWidth
}
func isIdentifierOpen(rune rune) bool {
switch rune := rune; {
case unicode.IsLetter(rune):
return true
case rune == '_':
return true
}
return false
}
func lexRuleSetInside(l *lexer) lexFunction {
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("unterminated set statement")
case isSpace(rune):
l.ignore()
case rune == ';':
return l.errorf("unexpected ;")
case rune == '=':
return l.errorf("unexpected =")
case isIdentifierOpen(rune):
l.backup()
return lexRuleSetKey
default:
return l.errorf("unrecognized input")
}
return lexRuleSetInside
}
func isIdentifier(rune rune) bool {
switch rune := rune; {
case isIdentifierOpen(rune):
return true
case unicode.IsDigit(rune):
return true
}
return false
}
func isMetricIdentifier(rune rune) bool {
switch rune := rune; {
case isIdentifier(rune):
return true
case rune == ':':
return true
}
return false
}
func (l *lexer) peek() rune {
rune := l.next()
l.backup()
return rune
}
func (l *lexer) atTerminator() bool {
switch rune := l.peek(); {
case isSpace(rune):
return true
case rune == ';':
return true
}
return false
}
func lexRuleSetKey(l *lexer) lexFunction {
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isIdentifier(rune):
default:
l.backup()
if !l.atTerminator() {
return l.errorf("unexpected character %+U %q", rune, rune)
}
l.emit(itemKey)
return lexRuleSetEqual
}
return lexRuleSetKey
}
func lexRuleSetEqual(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalEqual) {
l.position += len(literalEqual)
l.emit(itemEqual)
return lexRuleSetValueOpenQuote
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetEqual
}
func lexRuleSetValueOpenQuote(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalQuote) {
l.position += len(literalQuote)
l.emit(itemQuote)
return lexRuleSetValue
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetValueOpenQuote
}
func lexRuleSetValue(l *lexer) lexFunction {
var lastRuneEscapes bool = false
for {
rune := l.next()
{
if rune == '"' && !lastRuneEscapes {
l.backup()
l.emit(itemValue)
return lexRuleSetValueCloseQuote
}
if !lastRuneEscapes && rune == '\\' {
lastRuneEscapes = true
} else {
lastRuneEscapes = false
}
}
}
panic("unreachable")
}
func lexRuleSetValueCloseQuote(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalQuote) {
l.position += len(literalQuote)
l.emit(itemQuote)
return lexRuleSetSemicolon
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetValueCloseQuote
}
func lexRuleSetSemicolon(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalSemicolon) {
l.position += len(literalSemicolon)
l.emit(itemSemicolon)
return lexRulesetInside
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetSemicolon
}
func (l *lexer) ignore() {
l.start = l.position
}
func (l *lexer) errorf(format string, args ...interface{}) lexFunction {
l.elements <- element{itemError, fmt.Sprintf(format, args...)}
return nil
}
func isSpace(rune rune) bool {
switch rune {
case ' ', '\t', '\n', '\r':
return true
}
return false
}
func lexRulesetClose(l *lexer) lexFunction {
l.position += len(literalCloseBracket)
l.emit(itemCloseBracket)
return lexBody
}
func (l *lexer) emit(i itemType) {
l.elements <- element{i, l.input[l.start:l.position]}
l.start = l.position
}
type lexFunction func(*lexer) lexFunction
func main() {
in := `{
set evaluation_interval = "10m";
rules {
}
set name = "your mom";
}
{
set evaluation_interval = "30m";
}`
fmt.Println(in)
_, v := lex("", in)
for value := range v {
fmt.Println(value)
}
}

View file

@ -0,0 +1,192 @@
package main
import (
"testing"
)
type lexTest struct {
name string
input string
elements []element
}
var (
tEof = element{itemEof, ""}
tOpenRuleset = element{itemOpenBracket, "{"}
tCloseBracket = element{itemCloseBracket, "}"}
tIllegal = element{itemError, "illegal input"}
tSet = element{itemSetKeyword, "set"}
tEqual = element{itemEqual, "="}
tQuote = element{itemQuote, `"`}
tSemicolon = element{itemSemicolon, ";"}
tRules = element{itemRulesKeyword, "rules"}
)
var lexTests = []lexTest{
{
"empty",
"",
[]element{
tEof,
},
},
{
"empty with new line",
"\n\n",
[]element{
tEof,
},
},
{
"one empty ruleset",
"{}",
[]element{
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"one empty ruleset distributed over new line",
"{\n}",
[]element{
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"two empty rulesets",
"{} {}",
[]element{
tOpenRuleset,
tCloseBracket,
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"two empty rulesets distributed over new line",
"{}\n{}",
[]element{
tOpenRuleset,
tCloseBracket,
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"garbage",
"garbage",
[]element{tIllegal},
},
{
"one set",
`{ set foo = "bar"; }`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
{
"one set over multiple lines",
`{
set
foo
=
"bar"
;
}`,
[]element{tOpenRuleset, tSet, element{itemKey, "foo"}, tEqual, tQuote, element{itemValue, "bar"}, tQuote, tSemicolon, tCloseBracket, tEof},
},
{
"two sets",
`{ set foo = "bar";set baz = "qux"; }`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tSet,
element{itemKey, "baz"},
tEqual,
tQuote,
element{itemValue, "qux"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
{
"two over multiple lines",
`{ set foo = "bar";
set
baz
=
"qux"
;
}`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tSet,
element{itemKey, "baz"},
tEqual,
tQuote,
element{itemValue, "qux"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
}
func collect(l *lexTest) []element {
_, v := lex("", l.input)
emission := make([]element, 0)
for i := range v {
emission = append(emission, i)
}
return emission
}
func TestFoo(t *testing.T) {
for _, test := range lexTests {
e := collect(&test)
if len(e) != len(test.elements) {
t.Errorf("%s: got\n\n\t%v\nexpected\n\n\t%v", test.name, e, test.elements)
}
for i, _ := range test.elements {
if test.elements[i] != e[i] {
t.Errorf("%s[%d]: got\n\n\t%v\nexpected\n\n\t%v", test.name, i, e[i], test.elements[i])
}
}
}
}

5
encoder.go Normal file
View file

@ -0,0 +1,5 @@
package main
type Encoder interface {
Encode() ([]byte, error)
}

8
index.go Normal file
View file

@ -0,0 +1,8 @@
package main
type MembershipIndex interface {
Has(key Encoder) (bool, error)
Put(key Encoder) error
Drop(key Encoder) error
Close() error
}

43
levigo_index.go Normal file
View file

@ -0,0 +1,43 @@
package main
import (
data "github.com/matttproud/prometheus/model/generated"
)
type LevigoMembershipIndex struct {
persistence *LevigoPersistence
existenceValue Encoder
}
func (l *LevigoMembershipIndex) Close() error {
return l.persistence.Close()
}
func (l *LevigoMembershipIndex) Has(key Encoder) (bool, error) {
return l.persistence.Has(key)
}
func (l *LevigoMembershipIndex) Drop(key Encoder) error {
return l.persistence.Drop(key)
}
func (l *LevigoMembershipIndex) Put(key Encoder) error {
return l.persistence.Put(key, l.existenceValue)
}
func NewLevigoMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoMembershipIndex, error) {
var levigoPersistence *LevigoPersistence
var levigoPersistenceError error
existenceValue := NewProtocolBufferEncoder(&data.MembershipIndexValueDDO{})
if levigoPersistence, levigoPersistenceError = NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil {
levigoMembershipIndex := &LevigoMembershipIndex{
persistence: levigoPersistence,
existenceValue: existenceValue,
}
return levigoMembershipIndex, nil
}
return nil, levigoPersistenceError
}

View file

@ -0,0 +1,716 @@
package main
import (
"code.google.com/p/goprotobuf/proto"
"errors"
"fmt"
"github.com/matttproud/prometheus/coding/indexable"
data "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/utility"
"log"
"sort"
)
type pendingArchival map[int64]float64
type LevigoMetricPersistence struct {
fingerprintHighWaterMarks *LevigoPersistence
fingerprintLabelPairs *LevigoPersistence
fingerprintLowWaterMarks *LevigoPersistence
fingerprintSamples *LevigoPersistence
labelNameFingerprints *LevigoPersistence
labelPairFingerprints *LevigoPersistence
metricMembershipIndex *LevigoMembershipIndex
}
func (l *LevigoMetricPersistence) Close() error {
log.Printf("Closing LevigoPersistence storage containers...")
var persistences = []struct {
name string
closer LevigoCloser
}{
{
"Fingerprint High-Water Marks",
l.fingerprintHighWaterMarks,
},
{
"Fingerprint to Label Name and Value Pairs",
l.fingerprintLabelPairs,
},
{
"Fingerprint Low-Water Marks",
l.fingerprintLowWaterMarks,
},
{
"Fingerprint Samples",
l.fingerprintSamples,
},
{
"Label Name to Fingerprints",
l.labelNameFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelPairFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
}
errorChannel := make(chan error, len(persistences))
for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer
if closer != nil {
log.Printf("Closing LevigoPersistence storage container: %s\n", name)
closingError := closer.Close()
if closingError != nil {
log.Printf("Could not close a LevigoPersistence storage container; inconsistencies are possible: %q\n", closingError)
}
errorChannel <- closingError
} else {
errorChannel <- nil
}
}
for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
if closingError != nil {
return closingError
}
}
log.Printf("Successfully closed all LevigoPersistence storage containers.")
return nil
}
type levigoOpener func()
func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) {
log.Printf("Opening LevigoPersistence storage containers...")
errorChannel := make(chan error, 7)
emission := &LevigoMetricPersistence{}
var subsystemOpeners = []struct {
name string
opener levigoOpener
}{
{
"High-Water Marks by Fingerprint",
func() {
var anomaly error
emission.fingerprintHighWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Label Names and Value Pairs by Fingerprint",
func() {
var anomaly error
emission.fingerprintLabelPairs, anomaly = NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Low-Water Marks by Fingerprint",
func() {
var anomaly error
emission.fingerprintLowWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Samples by Fingerprint",
func() {
var anomaly error
emission.fingerprintSamples, anomaly = NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Fingerprints by Label Name",
func() {
var anomaly error
emission.labelNameFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Fingerprints by Label Name and Value Pair",
func() {
var anomaly error
emission.labelPairFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
errorChannel <- anomaly
},
},
{
"Metric Membership Index",
func() {
var anomaly error
emission.metricMembershipIndex, anomaly = NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
errorChannel <- anomaly
},
},
}
for _, subsystem := range subsystemOpeners {
name := subsystem.name
opener := subsystem.opener
log.Printf("Opening LevigoPersistence storage container: %s\n", name)
go opener()
}
for i := 0; i < cap(errorChannel); i++ {
openingError := <-errorChannel
if openingError != nil {
log.Printf("Could not open a LevigoPersistence storage container: %q\n", openingError)
return nil, openingError
}
}
log.Printf("Successfully opened all LevigoPersistence storage containers.\n")
return emission, nil
}
func ddoFromSample(sample *Sample) *data.MetricDDO {
labelNames := make([]string, 0, len(sample.Labels))
for labelName, _ := range sample.Labels {
labelNames = append(labelNames, string(labelName))
}
sort.Strings(labelNames)
labelPairs := make([]*data.LabelPairDDO, 0, len(sample.Labels))
for _, labelName := range labelNames {
labelValue := sample.Labels[labelName]
labelPair := &data.LabelPairDDO{
Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)),
}
labelPairs = append(labelPairs, labelPair)
}
metricDDO := &data.MetricDDO{
LabelPair: labelPairs,
}
return metricDDO
}
func ddoFromMetric(metric Metric) *data.MetricDDO {
labelNames := make([]string, 0, len(metric))
for labelName, _ := range metric {
labelNames = append(labelNames, string(labelName))
}
sort.Strings(labelNames)
labelPairs := make([]*data.LabelPairDDO, 0, len(metric))
for _, labelName := range labelNames {
labelValue := metric[labelName]
labelPair := &data.LabelPairDDO{
Name: proto.String(string(labelName)),
Value: proto.String(string(labelValue)),
}
labelPairs = append(labelPairs, labelPair)
}
metricDDO := &data.MetricDDO{
LabelPair: labelPairs,
}
return metricDDO
}
func fingerprintDDOFromByteArray(fingerprint []byte) *data.FingerprintDDO {
fingerprintDDO := &data.FingerprintDDO{
Signature: proto.String(string(fingerprint)),
}
return fingerprintDDO
}
func (l *LevigoMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) {
ddoKey := NewProtocolBufferEncoder(ddo)
return l.metricMembershipIndex.Has(ddoKey)
}
func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error {
ddoKey := NewProtocolBufferEncoder(ddo)
return l.metricMembershipIndex.Put(ddoKey)
}
func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := FingerprintFromByteArray(messageByteArray)
return &data.FingerprintDDO{
Signature: proto.String(string(fingerprint)),
}, nil
} else {
return nil, marshalError
}
return nil, errors.New("Unknown error in generating FingerprintDDO from message.")
}
func (l *LevigoMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) {
ddoKey := NewProtocolBufferEncoder(ddo)
return l.labelPairFingerprints.Has(ddoKey)
}
func (l *LevigoMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) {
ddoKey := NewProtocolBufferEncoder(ddo)
return l.labelNameFingerprints.Has(ddoKey)
}
func (l *LevigoMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) {
ddoKey := NewProtocolBufferEncoder(ddo)
if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil {
value := &data.FingerprintCollectionDDO{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
return nil, errors.New("Unknown error while getting label name and value pair fingerprints.")
}
func (l *LevigoMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) {
ddoKey := NewProtocolBufferEncoder(ddo)
if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil {
value := &data.FingerprintCollectionDDO{}
if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil {
return value, nil
} else {
return nil, unmarshalError
}
} else {
return nil, getError
}
return nil, errors.New("Unknown error while getting label name fingerprints.")
}
func (l *LevigoMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error {
labelPairEncoded := NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := NewProtocolBufferEncoder(fingerprints)
return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
}
func (l *LevigoMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error {
labelNameEncoded := NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := NewProtocolBufferEncoder(fingerprints)
return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
}
func (l *LevigoMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error {
if has, hasError := l.HasLabelPair(labelPair); hasError == nil {
var fingerprints *data.FingerprintCollectionDDO
if has {
if existing, existingError := l.GetLabelPairFingerprints(labelPair); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &data.FingerprintCollectionDDO{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevigoMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error {
labelName := &data.LabelNameDDO{
Name: labelPair.Name,
}
if has, hasError := l.HasLabelName(labelName); hasError == nil {
var fingerprints *data.FingerprintCollectionDDO
if has {
if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil {
fingerprints = existing
} else {
return existingError
}
} else {
fingerprints = &data.FingerprintCollectionDDO{}
}
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
} else {
return hasError
}
return errors.New("Unknown error when appending fingerprint to label name and value pair.")
}
func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error {
if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil {
labelPairCollectionDDO := &data.LabelPairCollectionDDO{
Member: ddo.LabelPair,
}
fingerprintKey := NewProtocolBufferEncoder(fingerprintDDO)
labelPairCollectionDDOEncoder := NewProtocolBufferEncoder(labelPairCollectionDDO)
if putError := l.fingerprintLabelPairs.Put(fingerprintKey, labelPairCollectionDDOEncoder); putError == nil {
labelCount := len(ddo.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range ddo.LabelPair {
go func(labelPair *data.LabelPairDDO) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDDO)
}(labelPair)
go func(labelPair *data.LabelPairDDO) {
labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDDO)
}(labelPair)
}
for i := 0; i < cap(labelPairErrors); i++ {
appendError := <-labelPairErrors
if appendError != nil {
return appendError
}
}
for i := 0; i < cap(labelNameErrors); i++ {
appendError := <-labelNameErrors
if appendError != nil {
return appendError
}
}
return nil
} else {
return putError
}
} else {
return fingerprintDDOError
}
return errors.New("Unknown error in appending label pairs to fingerprint.")
}
func (l *LevigoMetricPersistence) AppendSample(sample *Sample) error {
fmt.Printf("Sample: %q\n", sample)
metricDDO := ddoFromSample(sample)
if indexHas, indexHasError := l.hasIndexMetric(metricDDO); indexHasError == nil {
if !indexHas {
if indexPutError := l.indexMetric(metricDDO); indexPutError == nil {
if appendError := l.appendFingerprints(metricDDO); appendError != nil {
log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError)
return appendError
}
} else {
log.Printf("Could not add metric to membership index: %q\n", indexPutError)
return indexPutError
}
}
} else {
log.Printf("Could not query membership index for metric: %q\n", indexHasError)
return indexHasError
}
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
sampleKeyDDO := &data.SampleKeyDDO{
Fingerprint: fingerprintDDO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDDO := &data.SampleValueDDO{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := NewProtocolBufferEncoder(sampleKeyDDO)
sampleValueEncoded := NewProtocolBufferEncoder(sampleValueDDO)
if putError := l.fingerprintSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil {
log.Printf("Could not append metric sample: %q\n", putError)
return putError
}
} else {
log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDDOErr)
return fingerprintDDOErr
}
return nil
}
func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) {
if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil {
result := make([]string, 0, len(getAll))
labelNameDDO := &data.LabelNameDDO{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelNameDDO); unmarshalError == nil {
result = append(result, *labelNameDDO.Name)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label names.")
}
func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) {
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
result := make([]LabelPairs, 0, len(getAll))
labelPairDDO := &data.LabelPairDDO{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil {
item := LabelPairs{
*labelPairDDO.Name: *labelPairDDO.Value,
}
result = append(result, item)
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying label pairs.")
}
func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) {
log.Printf("GetMetrics()\n")
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
log.Printf("getAll: %q\n", getAll)
result := make([]LabelPairs, 0)
fingerprintCollection := &data.FingerprintCollectionDDO{}
fingerprints := make(utility.Set)
for _, pair := range getAll {
log.Printf("pair: %q\n", pair)
if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil {
for _, member := range fingerprintCollection.Member {
log.Printf("member: %q\n", member)
if !fingerprints.Has(*member.Signature) {
log.Printf("!Has: %q\n", member.Signature)
fingerprints.Add(*member.Signature)
log.Printf("fingerprints %q\n", fingerprints)
fingerprintEncoded := NewProtocolBufferEncoder(member)
if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintLabelPairs.Get(fingerprintEncoded); labelPairCollectionRawError == nil {
log.Printf("labelPairCollectionRaw: %q\n", labelPairCollectionRaw)
labelPairCollectionDDO := &data.LabelPairCollectionDDO{}
if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil {
intermediate := make(LabelPairs, 0)
for _, member := range labelPairCollectionDDO.Member {
intermediate[*member.Name] = *member.Value
}
log.Printf("intermediate: %q\n", intermediate)
result = append(result, intermediate)
} else {
return nil, labelPairCollectionDDOMarshalError
}
} else {
return nil, labelPairCollectionRawError
}
}
}
} else {
return nil, unmarshalError
}
}
return result, nil
} else {
return nil, getAllError
}
return nil, errors.New("Unknown error encountered when querying metrics.")
}
func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interval, int, error) {
metricDDO := ddoFromMetric(metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &data.SampleKeyDDO{
Fingerprint: fingerprintDDO,
Timestamp: indexable.EarliestTime,
}
if encode, encodeErr := NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
if iterator.Valid() {
found := &data.SampleKeyDDO{}
if unmarshalErr := proto.Unmarshal(iterator.Key(), found); unmarshalErr == nil {
var foundEntries int = 0
if *fingerprintDDO.Signature == *found.Fingerprint.Signature {
emission := &Interval{
OldestInclusive: indexable.DecodeTime(found.Timestamp),
NewestInclusive: indexable.DecodeTime(found.Timestamp),
}
for iterator = iterator; iterator.Valid(); iterator.Next() {
if subsequentUnmarshalErr := proto.Unmarshal(iterator.Key(), found); subsequentUnmarshalErr == nil {
if *fingerprintDDO.Signature != *found.Fingerprint.Signature {
return emission, foundEntries, nil
}
foundEntries++
log.Printf("b foundEntries++ %d\n", foundEntries)
emission.NewestInclusive = indexable.DecodeTime(found.Timestamp)
} else {
log.Printf("Could not de-serialize subsequent key: %q\n", subsequentUnmarshalErr)
return nil, -7, subsequentUnmarshalErr
}
}
return emission, foundEntries, nil
} else {
return &Interval{}, -6, nil
}
} else {
log.Printf("Could not de-serialize start key: %q\n", unmarshalErr)
return nil, -5, unmarshalErr
}
} else {
iteratorErr := iterator.GetError()
log.Printf("Could not seek for metric watermark beginning: %q\n", iteratorErr)
return nil, -4, iteratorErr
}
} else {
log.Printf("Could not seek for metric watermark: %q\n", encodeErr)
return nil, -3, encodeErr
}
} else {
if closer != nil {
defer closer.Close()
}
log.Printf("Could not provision iterator for metric: %q\n", iteratorErr)
return nil, -3, iteratorErr
}
} else {
log.Printf("Could not encode metric: %q\n", fingerprintDDOErr)
return nil, -2, fingerprintDDOErr
}
return nil, -1, errors.New("Unknown error occurred while querying metric watermarks.")
}
// TODO(mtp): Holes in the data!
func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval Interval) ([]Samples, error) {
metricDDO := ddoFromMetric(metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
start := &data.SampleKeyDDO{
Fingerprint: fingerprintDDO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]Samples, 0)
if encode, encodeErr := NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &data.SampleKeyDDO{}
value := &data.SampleValueDDO{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if *fingerprintDDO.Signature == *key.Fingerprint.Signature {
// Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, Samples{
Value: SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
break
}
} else {
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
}
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDDOErr)
return nil, fingerprintDDOErr
}
return nil, errors.New("Unknown error occurred while querying metric watermarks.")
}

View file

@ -0,0 +1,590 @@
package main
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
data "github.com/matttproud/prometheus/model/generated"
"io/ioutil"
"math"
"math/rand"
"os"
"testing"
"testing/quick"
"time"
)
func TestBasicLifecycle(t *testing.T) {
temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "levigo_metric_persistence_test")
if temporaryDirectoryErr != nil {
t.Errorf("Could not create test directory: %q\n", temporaryDirectoryErr)
return
}
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, openErr := NewLevigoMetricPersistence(temporaryDirectory)
if openErr != nil {
t.Errorf("Could not create Levigo Metric Persistence: %q\n", openErr)
}
if persistence == nil {
t.Errorf("Received nil Levigo Metric Persistence.\n")
return
}
closeErr := persistence.Close()
if closeErr != nil {
t.Errorf("Could not close Levigo Metric Persistence: %q\n", closeErr)
}
}
func TestReadEmpty(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevigoMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
hasLabelPair := func(x int) bool {
name := string(x)
value := string(x)
ddo := &data.LabelPairDDO{
Name: proto.String(name),
Value: proto.String(value),
}
has, hasErr := persistence.HasLabelPair(ddo)
if hasErr != nil {
return false
}
return has == false
}
if hasPairErr := quick.Check(hasLabelPair, nil); hasPairErr != nil {
t.Error(hasPairErr)
}
hasLabelName := func(x int) bool {
name := string(x)
ddo := &data.LabelNameDDO{
Name: proto.String(name),
}
has, hasErr := persistence.HasLabelName(ddo)
if hasErr != nil {
return false
}
return has == false
}
if hasNameErr := quick.Check(hasLabelName, nil); hasNameErr != nil {
t.Error(hasNameErr)
}
getLabelPairFingerprints := func(x int) bool {
name := string(x)
value := string(x)
ddo := &data.LabelPairDDO{
Name: proto.String(name),
Value: proto.String(value),
}
fingerprints, fingerprintsErr := persistence.GetLabelPairFingerprints(ddo)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
return len(fingerprints.Member) == 0
}
if labelPairFingerprintsErr := quick.Check(getLabelPairFingerprints, nil); labelPairFingerprintsErr != nil {
t.Error(labelPairFingerprintsErr)
}
getLabelNameFingerprints := func(x int) bool {
name := string(x)
ddo := &data.LabelNameDDO{
Name: proto.String(name),
}
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(ddo)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
return len(fingerprints.Member) == 0
}
if labelNameFingerprintsErr := quick.Check(getLabelNameFingerprints, nil); labelNameFingerprintsErr != nil {
t.Error(labelNameFingerprintsErr)
}
}
func TestAppendSampleAsPureSparseAppend(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevigoMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), int64(x)),
Labels: LabelPairs{string(x): string(x)},
}
appendErr := persistence.AppendSample(sample)
return appendErr == nil
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevigoMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), int64(x)),
Labels: LabelPairs{string(x): string(x)},
}
appendErr := persistence.AppendSample(sample)
if appendErr != nil {
return false
}
labelNameDDO := &data.LabelNameDDO{
Name: proto.String(string(x)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDDO)
if hasLabelNameErr != nil {
return false
}
if !hasLabelName {
return false
}
labelPairDDO := &data.LabelPairDDO{
Name: proto.String(string(x)),
Value: proto.String(string(x)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDDO)
if hasLabelPairErr != nil {
return false
}
if !hasLabelPair {
return false
}
labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDDO)
if labelNameFingerprintsErr != nil {
return false
}
if labelNameFingerprints == nil {
return false
}
if len(labelNameFingerprints.Member) != 1 {
return false
}
labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPairDDO)
if labelPairFingerprintsErr != nil {
return false
}
if labelPairFingerprints == nil {
return false
}
if len(labelPairFingerprints.Member) != 1 {
return false
}
return true
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevigoMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), 0),
Labels: LabelPairs{"name": "my_metric"},
}
appendErr := persistence.AppendSample(sample)
return appendErr == nil
}
if appendErr := quick.Check(appendSample, nil); appendErr != nil {
t.Error(appendErr)
}
}
func TestStochastic(t *testing.T) {
stochastic := func(x int) bool {
seed := rand.NewSource(int64(x))
random := rand.New(seed)
numberOfMetrics := random.Intn(5) + 1
numberOfSharedLabels := random.Intn(5)
numberOfUnsharedLabels := random.Intn(5)
numberOfSamples := random.Intn(1024) + 2
numberOfRangeScans := random.Intn(3)
temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test")
defer func() {
if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil {
t.Errorf("Could not remove temporary directory: %q\n", removeAllErr)
}
}()
persistence, _ := NewLevigoMetricPersistence(temporaryDirectory)
defer func() {
persistence.Close()
}()
metricTimestamps := make(map[int]map[int64]bool)
metricEarliestSample := make(map[int]int64)
metricNewestSample := make(map[int]int64)
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
sample := &Sample{
Labels: LabelPairs{},
}
sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex)
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
sample.Labels[fmt.Sprintf("shared_label_%d", sharedLabelIndex)] = fmt.Sprintf("label_%d", sharedLabelIndex)
}
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
sample.Labels[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)] = fmt.Sprintf("private_label_%d", unsharedLabelIndex)
}
timestamps := make(map[int64]bool)
metricTimestamps[metricIndex] = timestamps
var newestSample int64 = math.MinInt64
var oldestSample int64 = math.MaxInt64
var nextTimestamp func() int64
nextTimestamp = func() int64 {
var candidate int64
candidate = random.Int63n(math.MaxInt32 - 1)
if _, has := timestamps[candidate]; has {
candidate = nextTimestamp()
}
timestamps[candidate] = true
if candidate < oldestSample {
oldestSample = candidate
}
if candidate > newestSample {
newestSample = candidate
}
return candidate
}
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = time.Unix(nextTimestamp(), 0)
sample.Value = SampleValue(sampleIndex)
appendErr := persistence.AppendSample(sample)
if appendErr != nil {
return false
}
}
metricEarliestSample[metricIndex] = oldestSample
metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
labelPair := &data.LabelPairDDO{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair)
if hasLabelPairErr != nil {
return false
}
if hasLabelPair != true {
return false
}
labelName := &data.LabelNameDDO{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName)
if hasLabelNameErr != nil {
return false
}
if hasLabelName != true {
return false
}
}
}
for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ {
labelName := &data.LabelNameDDO{
Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)),
}
fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
if fingerprintsErr != nil {
return false
}
if fingerprints == nil {
return false
}
if len(fingerprints.Member) != numberOfMetrics {
return false
}
}
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelPair := &data.LabelPairDDO{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)),
}
hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair)
if hasLabelPairErr != nil {
return false
}
if hasLabelPair != true {
return false
}
labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPair)
if labelPairFingerprintsErr != nil {
return false
}
if labelPairFingerprints == nil {
return false
}
if len(labelPairFingerprints.Member) != 1 {
return false
}
labelName := &data.LabelNameDDO{
Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)),
}
hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName)
if hasLabelNameErr != nil {
return false
}
if hasLabelName != true {
return false
}
labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelName)
if labelNameFingerprintsErr != nil {
return false
}
if labelNameFingerprints == nil {
return false
}
if len(labelNameFingerprints.Member) != 1 {
return false
}
}
metric := make(Metric)
metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex)
for i := 0; i < numberOfSharedLabels; i++ {
metric[fmt.Sprintf("shared_label_%d", i)] = fmt.Sprintf("label_%d", i)
}
for i := 0; i < numberOfUnsharedLabels; i++ {
metric[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i)] = fmt.Sprintf("private_label_%d", i)
}
watermarks, count, watermarksErr := persistence.GetWatermarksForMetric(metric)
if watermarksErr != nil {
return false
}
if watermarks == nil {
return false
}
if count != numberOfSamples {
return false
}
minimum := metricEarliestSample[metricIndex]
maximum := metricNewestSample[metricIndex]
spread := maximum - minimum
for i := 0; i < numberOfRangeScans; i++ {
timestamps := metricTimestamps[metricIndex]
var first int64 = 0
var second int64 = 0
for {
first = minimum + random.Int63n(spread)
if _, has := timestamps[first]; has {
break
}
}
for {
second = minimum + random.Int63n(spread)
if _, has := timestamps[second]; has && second != first {
break
}
}
var begin int64 = 0
var end int64 = 0
if first > second {
begin = second
end = first
} else {
begin = first
end = second
}
interval := Interval{
OldestInclusive: time.Unix(begin, 0),
NewestInclusive: time.Unix(end, 0),
}
rangeValues, rangeErr := persistence.GetSamplesForMetric(metric, interval)
if rangeErr != nil {
return false
}
if len(rangeValues) < 2 {
return false
}
}
}
return true
}
if stochasticError := quick.Check(stochastic, nil); stochasticError != nil {
t.Error(stochasticError)
}
}

216
levigo_persistence.go Normal file
View file

@ -0,0 +1,216 @@
package main
import (
"github.com/jmhodges/levigo"
"io"
)
type LevigoCloser interface {
Close() error
}
type LevigoPersistence struct {
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy
options *levigo.Options
storage *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
}
func NewLevigoPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoPersistence, error) {
options := levigo.NewOptions()
options.SetCreateIfMissing(true)
options.SetParanoidChecks(true)
cache := levigo.NewLRUCache(cacheCapacity)
options.SetCache(cache)
filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded)
options.SetFilterPolicy(filterPolicy)
storage, openErr := levigo.Open(storageRoot, options)
readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(true)
emission := &LevigoPersistence{
cache: cache,
filterPolicy: filterPolicy,
options: options,
readOptions: readOptions,
storage: storage,
writeOptions: writeOptions,
}
return emission, openErr
}
func (l *LevigoPersistence) Close() error {
if l.storage != nil {
l.storage.Close()
}
defer func() {
if l.filterPolicy != nil {
l.filterPolicy.Close()
}
}()
defer func() {
if l.cache != nil {
l.cache.Close()
}
}()
defer func() {
if l.options != nil {
l.options.Close()
}
}()
defer func() {
if l.readOptions != nil {
l.readOptions.Close()
}
}()
defer func() {
if l.writeOptions != nil {
l.writeOptions.Close()
}
}()
return nil
}
func (l *LevigoPersistence) Get(value Encoder) ([]byte, error) {
var key []byte
var keyError error
if key, keyError = value.Encode(); keyError == nil {
return l.storage.Get(l.readOptions, key)
}
return nil, keyError
}
func (l *LevigoPersistence) Has(value Encoder) (bool, error) {
if value, getError := l.Get(value); getError != nil {
return false, getError
} else if value == nil {
return false, nil
}
return true, nil
}
func (l *LevigoPersistence) Drop(value Encoder) error {
var key []byte
var keyError error
if key, keyError = value.Encode(); keyError == nil {
if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil {
return deleteError
}
return nil
}
return keyError
}
func (l *LevigoPersistence) Put(key Encoder, value Encoder) error {
var keyEncoded []byte
var keyError error
if keyEncoded, keyError = key.Encode(); keyError == nil {
if valueEncoded, valueError := value.Encode(); valueError == nil {
if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil {
return putError
}
} else {
return valueError
}
return nil
}
return keyError
}
func (l *LevigoPersistence) GetAll() ([]Pair, error) {
snapshot := l.storage.NewSnapshot()
defer l.storage.ReleaseSnapshot(snapshot)
readOptions := levigo.NewReadOptions()
defer readOptions.Close()
readOptions.SetSnapshot(snapshot)
iterator := l.storage.NewIterator(readOptions)
defer iterator.Close()
iterator.SeekToFirst()
result := make([]Pair, 0)
for iterator := iterator; iterator.Valid(); iterator.Next() {
result = append(result, Pair{Left: iterator.Key(), Right: iterator.Value()})
}
iteratorError := iterator.GetError()
if iteratorError != nil {
return nil, iteratorError
}
return result, nil
}
type iteratorCloser struct {
iterator *levigo.Iterator
readOptions *levigo.ReadOptions
snapshot *levigo.Snapshot
storage *levigo.DB
}
func (i *iteratorCloser) Close() error {
defer func() {
if i.storage != nil {
if i.snapshot != nil {
i.storage.ReleaseSnapshot(i.snapshot)
}
}
}()
defer func() {
if i.iterator != nil {
i.iterator.Close()
}
}()
defer func() {
if i.readOptions != nil {
i.readOptions.Close()
}
}()
return nil
}
func (l *LevigoPersistence) GetIterator() (*levigo.Iterator, io.Closer, error) {
snapshot := l.storage.NewSnapshot()
readOptions := levigo.NewReadOptions()
readOptions.SetSnapshot(snapshot)
iterator := l.storage.NewIterator(readOptions)
closer := &iteratorCloser{
iterator: iterator,
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
}
return iterator, closer, nil
}

16
main.go Normal file
View file

@ -0,0 +1,16 @@
package main
import (
"code.google.com/p/gorest"
"net/http"
)
func main() {
m, _ := NewLevigoMetricPersistence("/tmp/metrics")
s := &MetricsService{
persistence: m,
}
gorest.RegisterService(s)
http.Handle("/", gorest.Handle())
http.ListenAndServe(":8787", nil)
}

43
metric.go Normal file
View file

@ -0,0 +1,43 @@
package main
import (
"crypto/md5"
"encoding/hex"
"io"
"time"
)
type Fingerprint string
type LabelPairs map[string]string
type Metric map[string]string
type SampleValue float32
type Sample struct {
Labels LabelPairs
Value SampleValue
Timestamp time.Time
}
type Samples struct {
Value SampleValue
Timestamp time.Time
}
type Interval struct {
OldestInclusive time.Time
NewestInclusive time.Time
}
func FingerprintFromString(value string) Fingerprint {
hash := md5.New()
io.WriteString(hash, value)
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
}
func FingerprintFromByteArray(value []byte) Fingerprint {
hash := md5.New()
hash.Write(value)
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
}

15
metric_persistence.go Normal file
View file

@ -0,0 +1,15 @@
package main
type MetricPersistence interface {
Close() error
AppendSample(sample *Sample) error
GetLabelNames() ([]string, error)
GetLabelPairs() ([]LabelPairs, error)
GetMetrics() ([]LabelPairs, error)
GetMetricFingerprintsForLabelPairs(labelSets []*LabelPairs) ([]*Fingerprint, error)
RecordLabelNameFingerprint(sample *Sample) error
RecordFingerprintWatermark(sample *Sample) error
GetFingerprintLabelPairs(fingerprint Fingerprint) (LabelPairs, error)
}

10
model/Makefile Normal file
View file

@ -0,0 +1,10 @@
export PATH := $(PATH):/Users/mtp/Development/go/bin
all: data.proto
data.proto:
protoc --go_out=generated/ data.proto
clean:
rm -rf generated/*
.PHONY: data.proto

86
model/data.proto Normal file
View file

@ -0,0 +1,86 @@
package generated;
message LabelPairDDO {
optional int64 version = 1 [default = 1];
optional string name = 2;
optional string value = 3;
}
message MetricDDO {
optional int64 version = 1 [default = 1];
repeated LabelPairDDO label_pair = 2;
}
message FingerprintCollectionDDO {
optional int64 version = 1 [default = 1];
repeated FingerprintDDO member = 2;
}
message LabelPairCollectionDDO {
optional int64 version = 1 [default = 1];
repeated LabelPairDDO member = 2;
}
message LabelNameDDO {
optional int64 version = 1 [default = 1];
optional string name = 2;
}
message FingerprintDDO {
optional int64 version = 1 [default = 1];
// bytes
optional string signature = 2;
}
message WatermarkDDO {
optional int64 version = 1 [default = 1];
optional int64 timestamp = 2;
}
message SampleKeyDDO {
optional int64 version = 1 [default = 1];
optional FingerprintDDO fingerprint = 2;
optional bytes timestamp = 3;
}
message SampleValueDDO {
optional int64 version = 1 [default = 1];
optional float value = 2;
}
// TOO OLD
message MembershipIndexValueDDO {
}
message LabelNameAndValueIndexDDO {
optional string name = 1;
optional string value = 2;
}
message LabelNameValuesDDO {
repeated string value = 1;
}
message LabelNameAndValueToMetricDDO {
repeated string metric = 1;
}
message MetricToWindowDDO {
repeated int64 window = 1;
}
message WindowSampleDDO {
repeated float value = 1;
}

15
persistence.go Normal file
View file

@ -0,0 +1,15 @@
package main
type Pair struct {
Left []byte
Right []byte
}
type Persistence interface {
Has(key Encoder) (bool, error)
Get(key Encoder) ([]byte, error)
GetAll() ([]Pair, error)
Drop(key Encoder) error
Put(key Encoder, value Encoder) error
Close() error
}

View file

@ -0,0 +1,19 @@
package main
import (
"code.google.com/p/goprotobuf/proto"
)
type ProtocolBufferEncoder struct {
message proto.Message
}
func (p *ProtocolBufferEncoder) Encode() ([]byte, error) {
return proto.Marshal(p.message)
}
func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder {
return &ProtocolBufferEncoder{
message: message,
}
}

57
service.go Normal file
View file

@ -0,0 +1,57 @@
package main
import (
"code.google.com/p/gorest"
)
type MetricsService struct {
gorest.RestService `root:"/" consumes:"application/json" produces:"application/json"`
persistence *LevigoMetricPersistence
listLabels gorest.EndPoint `method:"GET" path:"/labels/" output:"[]string"`
listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]LabelPairs"`
listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]LabelPairs"`
appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"Sample"`
}
func (m MetricsService) ListLabels() []string {
labels, labelsError := m.persistence.GetLabelNames()
if labelsError != nil {
m.ResponseBuilder().SetResponseCode(500)
}
return labels
}
func (m MetricsService) ListLabelPairs() []LabelPairs {
labelPairs, labelPairsError := m.persistence.GetLabelPairs()
if labelPairsError != nil {
m.ResponseBuilder().SetResponseCode(500)
}
return labelPairs
}
func (m MetricsService) ListMetrics() []LabelPairs {
metrics, metricsError := m.persistence.GetMetrics()
if metricsError != nil {
m.ResponseBuilder().SetResponseCode(500)
}
return metrics
}
func (m MetricsService) AppendSample(s Sample) {
responseBuilder := m.ResponseBuilder()
if appendError := m.persistence.AppendSample(&s); appendError == nil {
responseBuilder.SetResponseCode(200)
return
}
responseBuilder.SetResponseCode(500)
}

39
utility/set.go Normal file
View file

@ -0,0 +1,39 @@
package utility
type Set map[interface{}]bool
func (s Set) Add(v interface{}) {
s[v] = true
}
func (s Set) Remove(v interface{}) {
delete(s, v)
}
func (s Set) Elements() []interface{} {
result := make([]interface{}, 0, len(s))
for k, _ := range s {
result = append(result, k)
}
return result
}
func (s Set) Has(v interface{}) bool {
_, p := s[v]
return p
}
func (s Set) Intersection(o Set) Set {
result := make(Set)
for k, _ := range s {
if o.Has(k) {
result[k] = true
}
}
return result
}

109
utility/set_test.go Normal file
View file

@ -0,0 +1,109 @@
package utility
import (
"testing"
"testing/quick"
)
func TestSetEqualMemberships(t *testing.T) {
f := func(x int) bool {
first := make(Set)
second := make(Set)
first.Add(x)
second.Add(x)
intersection := first.Intersection(second)
members := intersection.Elements()
return members != nil && len(members) == 1 && members[0] == x
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestSetInequalMemberships(t *testing.T) {
f := func(x int) bool {
first := make(Set)
second := make(Set)
first.Add(x)
intersection := first.Intersection(second)
members := intersection.Elements()
return members != nil && len(members) == 0
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestSetAsymmetricMemberships(t *testing.T) {
f := func(x int) bool {
first := make(Set)
second := make(Set)
first.Add(x)
second.Add(x)
first.Add(x + 1)
second.Add(x + 1)
second.Add(x + 2)
first.Add(x + 2)
first.Add(x + 3)
second.Add(x + 4)
intersection := first.Intersection(second)
members := intersection.Elements()
return members != nil && len(members) == 3
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestSetRemoval(t *testing.T) {
f := func(x int) bool {
first := make(Set)
first.Add(x)
first.Remove(x)
members := first.Elements()
return members != nil && len(members) == 0
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
func TestSetAdditionAndRemoval(t *testing.T) {
f := func(x int) bool {
first := make(Set)
second := make(Set)
first.Add(x)
second.Add(x)
first.Add(x + 1)
first.Remove(x + 1)
intersection := first.Intersection(second)
members := intersection.Elements()
return members != nil && len(members) == 1 && members[0] == x
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}