diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..a6b7d8f14 --- /dev/null +++ b/Makefile @@ -0,0 +1,23 @@ +TEST_ARTIFACTS = prometheus + +all: test + +test: build + go test ./... + +build: + $(MAKE) -C model + go build ./... + go build . + +clean: + rm -rf $(TEST_ARTIFACTS) + $(MAKE) -C model clean + -find . -type f -iname '*~' -exec rm '{}' ';' + -find . -type f -iname '*#' -exec rm '{}' ';' + -find . -type f -iname '.*' -exec rm '{}' ';' + +format: + find . -iname '*.go' | grep -v generated | xargs -n1 gofmt -w + +.PHONY: build clean format test diff --git a/README.md b/README.md index 464dcc784..682fbbf65 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,22 @@ -prometheus +Prometheus ========== -Bedecke deinen Himmel, Zeus! \ No newline at end of file +Bedecke deinen Himmel, Zeus! A new kid is in town. + +Prerequisites +============= +1. Go 1.0.X. +2. LevelDB: (https://code.google.com/p/leveldb/). +3. Protocol Buffers Compiler: (http://code.google.com/p/protobuf/). +4. goprotobuf: the code generator and runtime library: (http://code.google.com/p/goprotobuf/). +5. Levigo, a Go-wrapper around LevelDB's C library: (https://github.com/jmhodges/levigo). + +Initial Hurdles +=============== +1. A bit of this grew organically without an easy way of binding it all together. The tests will pass but slowly. They were not optimized for speed but end-to-end coverage of the whole storage model. This is something immediate to fix. +2. Protocol Buffer generator for Go changed emitted output API. This will need to be fixed before other contributors can participate. + + +Milestones +========== +1. In-memory archive, basic rule language, simple computation engine, and naive exposition system. diff --git a/coding/indexable/time.go b/coding/indexable/time.go new file mode 100644 index 000000000..5daead3b1 --- /dev/null +++ b/coding/indexable/time.go @@ -0,0 +1,26 @@ +package indexable + +import ( + "encoding/binary" + "time" +) + +var ( + EarliestTime = EncodeTime(time.Unix(0, 0)) +) + +func EncodeTimeInto(dst []byte, t time.Time) { + binary.BigEndian.PutUint64(dst, uint64(t.Unix())) +} + +func EncodeTime(t time.Time) []byte { + buffer := make([]byte, 8) + + EncodeTimeInto(buffer, t) + + return buffer +} + +func DecodeTime(src []byte) time.Time { + return time.Unix(int64(binary.BigEndian.Uint64(src)), 0) +} diff --git a/coding/indexable/time_test.go b/coding/indexable/time_test.go new file mode 100644 index 000000000..e8c646e66 --- /dev/null +++ b/coding/indexable/time_test.go @@ -0,0 +1,25 @@ +package indexable + +import ( + "math/rand" + "testing" + "testing/quick" + "time" +) + +func TestTimeEndToEnd(t *testing.T) { + tester := func(x int) bool { + random := rand.New(rand.NewSource(int64(x))) + buffer := make([]byte, 8) + incoming := time.Unix(random.Int63(), 0) + + EncodeTimeInto(buffer, incoming) + outgoing := DecodeTime(buffer) + + return incoming.Equal(outgoing) && incoming.Unix() == outgoing.Unix() + } + + if err := quick.Check(tester, nil); err != nil { + t.Error(err) + } +} diff --git a/configuration/grammar/.#example.rules b/configuration/grammar/.#example.rules new file mode 120000 index 000000000..f26fc01a4 --- /dev/null +++ b/configuration/grammar/.#example.rules @@ -0,0 +1 @@ +mtp@Matt.local.225 \ No newline at end of file diff --git a/configuration/grammar/example.rules b/configuration/grammar/example.rules new file mode 100644 index 000000000..6dc1bce03 --- /dev/null +++ b/configuration/grammar/example.rules @@ -0,0 +1,61 @@ +// { +// set evaluation_interval = "30s"; +// target "http://www.example.com:80/metrics" + +// rule archived {name="instance:requests_total:sum"} = sum by (instance) {name="requests"}; +// rule archived {name="instance:requests-by_result_code_total:sum"} = +// sum by (instance,result_code) {name="requests"}; +// rule archived {name="instance:requests-by_result_code:sum"} = +// {name="instances:requests-by_result_code"} +// / by (instance) +// {name="instances:requests_total:sum"}; +// } + +{ + set evaluation_interval = “2m”; + + permanent { + rule { + labels { + set name = “process:request_rate_qps-allowed:sum”; + set job = “frontend”; + } + + // Values may be a literal or an expression. + set value = 500; + + // I wonder: Is it practical to express labels similar to above in a better DSL? + // set value = EXPRESSION … WITH LABELS {foo=”bar”}; + } + } + + rule { + // This provides a way of overriding existing labels, unsetting them, or + // appending new ones. + labels { + // “name” is obligatory. + set name = “process:requests_total:sum”; + } + + // Here we are extracting a metric with the name label value of “requests” from + // job “frontend” and merely accumulating this into a named variable. It is + // similar to standing. Each sum is keyed to the UNIX process and job from which + // it came. + set value = SUM({name=”requests”, job=”frontend”}) BY (process, job); + } + + rule { + // This provides a way of overriding existing labels, unsetting them, or + // appending new ones. + labels { + // “name” is obligatory. + set name = “process:request_qps:rate5m”; + } + + // Here we are extracting a metric with the name label value of “requests” from + // job “frontend” and merely accumulating this into a named variable. It is + // similar to standing. + set value = RATE({name=”process:requests_total:sum”, job=”frontend”} OVER “5m”); + } + } +} \ No newline at end of file diff --git a/configuration/grammar/lex.go b/configuration/grammar/lex.go new file mode 100644 index 000000000..430ac3602 --- /dev/null +++ b/configuration/grammar/lex.go @@ -0,0 +1,506 @@ +package main + +import ( + "fmt" + "strings" + "unicode" + "unicode/utf8" +) + +type itemType int + +const ( + itemError itemType = iota + itemEof + itemRuleContextOpen + itemRuleContextClose + itemSetKeyword + itemKey + itemEqual + itemValue + itemSemicolon + itemQuote + itemRulesKeyword + itemRuleKeyword + itemRuleName + itemRuleValue + itemOpenBracket + itemCloseBracket +) + +const ( + literalCloseBracket = "}" + literalEof = -1 + literalOpenBracket = "{" + literalRule = "rule" + literalRules = "rules" + literalSet = "set" + literalEqual = "=" + literalQuote = `"` + literalSemicolon = ";" +) + +type element struct { + itemType itemType + value string +} + +func (e element) String() string { + switch e.itemType { + case itemError: + return e.value + case itemEof: + return "EOF" + } + return fmt.Sprintf("%s %q", e.itemType, e.value) +} + +type lexer struct { + input string + elements chan element + start int + position int + runeWidth int +} + +func lex(name, input string) (*lexer, chan element) { + l := &lexer{ + input: input, + elements: make(chan element), + } + go l.run() + return l, l.elements +} + +func (l *lexer) run() { + for state := lexBody; state != nil; { + state = state(l) + } + close(l.elements) +} + +func (l *lexer) next() (rune rune) { + if l.position >= len(l.input) { + l.runeWidth = 0 + return literalEof + } + + rune, l.runeWidth = utf8.DecodeRuneInString(l.input[l.position:]) + l.position += l.runeWidth + return rune +} + +func lexBody(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalOpenBracket) { + return lexRulesetOpen + } + + switch rune := l.next(); { + case rune == literalEof: + l.emit(itemEof) + return nil + case isSpace(rune): + l.ignore() + default: + return l.errorf("illegal input") + } + return lexBody +} + +func lexRulesetOpen(l *lexer) lexFunction { + l.position += len(literalOpenBracket) + l.emit(itemRuleContextOpen) + + return lexRulesetInside +} + +func lexRulesetInside(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalCloseBracket) { + return lexRulesetClose + } + + if strings.HasPrefix(l.input[l.position:], literalSet) { + return lexRuleSetKeyword + } + + if strings.HasPrefix(l.input[l.position:], literalRules) { + return lexRulesetRules + } + + switch rune := l.next(); { + case rune == literalEof: + return l.errorf("unterminated ruleset") + case isSpace(rune): + l.ignore() + case rune == ';': + l.ignore() + default: + return l.errorf("unrecognized input") + } + return lexRulesetInside +} + +func lexRulesetRules(l *lexer) lexFunction { + l.position += len(literalRules) + l.emit(itemRulesKeyword) + + return lexRulesetRulesBlockOpen +} + +func lexRulesetRulesBlockOpen(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalOpenBracket) { + l.position += len(literalOpenBracket) + l.emit(itemOpenBracket) + return lexRulesetRulesBlockInside + } + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + default: + return l.errorf("unrecognized input") + } + + return lexRulesetRulesBlockOpen +} + +func lexRulesetRulesBlockInside(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalRule) { + return lexRulesetRuleBegin + } + + if strings.HasPrefix(l.input[l.position:], literalCloseBracket) { + return lexRulesetRulesBlockClose + } + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + default: + return l.errorf("unrecognized input") + } + + return lexRulesetRulesBlockInside +} + +func lexRulesetRulesBlockClose(l *lexer) lexFunction { + l.position += len(literalCloseBracket) + l.emit(itemCloseBracket) + + return lexRulesetInside +} + +func lexRulesetRuleBegin(l *lexer) lexFunction { + l.position += len(literalRule) + l.emit(itemRuleKeyword) + + return lexRulesetRuleName +} + +func lexRulesetRuleName(l *lexer) lexFunction { + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + case isIdentifierOpen(rune): + for { + switch rune := l.next(); { + case isMetricIdentifier(rune): + case rune == '=': + l.backup() + l.emit(itemRuleName) + return lexRulesetRuleEqual + default: + return l.errorf("bad rule name") + } + } + default: + return l.errorf("unrecognized input") + } + + return lexRulesetRuleName +} + +func lexRulesetRuleEqual(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalEqual) { + l.position += len(literalEqual) + l.emit(itemEqual) + return lexRulesetRuleDefinitionBegin + } + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + default: + return l.errorf("unrecognized input") + } + + return lexRulesetRuleEqual +} + +func lexRulesetRuleDefinitionBegin(l *lexer) lexFunction { + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + case isIdentifierOpen(rune): + for { + switch rune := l.next(); { + case isMetricIdentifier(rune): + case rune == ';': + l.emit(itemRuleValue) + return lexRulesetRulesBlockInside + default: + return l.errorf("unrecognized input") + } + } + default: + return l.errorf("unrecognized input") + } + + return lexRulesetRuleDefinitionBegin +} + +func lexRuleSetKeyword(l *lexer) lexFunction { + l.position += len(literalSet) + + l.emit(itemSetKeyword) + + return lexRuleSetInside +} + +func (l *lexer) backup() { + l.position -= l.runeWidth +} + +func isIdentifierOpen(rune rune) bool { + switch rune := rune; { + case unicode.IsLetter(rune): + return true + case rune == '_': + return true + } + + return false +} + +func lexRuleSetInside(l *lexer) lexFunction { + switch rune := l.next(); { + case rune == literalEof: + return l.errorf("unterminated set statement") + case isSpace(rune): + l.ignore() + case rune == ';': + return l.errorf("unexpected ;") + case rune == '=': + return l.errorf("unexpected =") + case isIdentifierOpen(rune): + l.backup() + return lexRuleSetKey + default: + return l.errorf("unrecognized input") + } + + return lexRuleSetInside +} + +func isIdentifier(rune rune) bool { + switch rune := rune; { + case isIdentifierOpen(rune): + return true + case unicode.IsDigit(rune): + return true + } + return false +} + +func isMetricIdentifier(rune rune) bool { + switch rune := rune; { + case isIdentifier(rune): + return true + case rune == ':': + return true + } + + return false +} + +func (l *lexer) peek() rune { + rune := l.next() + l.backup() + return rune +} + +func (l *lexer) atTerminator() bool { + switch rune := l.peek(); { + case isSpace(rune): + return true + case rune == ';': + return true + } + + return false +} + +func lexRuleSetKey(l *lexer) lexFunction { + switch rune := l.next(); { + case rune == literalEof: + return l.errorf("incomplete set statement") + case isIdentifier(rune): + default: + l.backup() + if !l.atTerminator() { + return l.errorf("unexpected character %+U %q", rune, rune) + } + l.emit(itemKey) + return lexRuleSetEqual + } + return lexRuleSetKey +} + +func lexRuleSetEqual(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalEqual) { + l.position += len(literalEqual) + l.emit(itemEqual) + return lexRuleSetValueOpenQuote + } + + switch rune := l.next(); { + case rune == literalEof: + return l.errorf("incomplete set statement") + case isSpace(rune): + l.ignore() + default: + return l.errorf("unexpected character %+U %q", rune, rune) + } + return lexRuleSetEqual +} + +func lexRuleSetValueOpenQuote(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalQuote) { + l.position += len(literalQuote) + l.emit(itemQuote) + + return lexRuleSetValue + } + + switch rune := l.next(); { + case rune == literalEof: + return l.errorf("incomplete set statement") + case isSpace(rune): + l.ignore() + default: + return l.errorf("unexpected character %+U %q", rune, rune) + } + return lexRuleSetValueOpenQuote +} + +func lexRuleSetValue(l *lexer) lexFunction { + var lastRuneEscapes bool = false + + for { + rune := l.next() + { + if rune == '"' && !lastRuneEscapes { + l.backup() + l.emit(itemValue) + return lexRuleSetValueCloseQuote + } + + if !lastRuneEscapes && rune == '\\' { + lastRuneEscapes = true + } else { + lastRuneEscapes = false + } + } + } + + panic("unreachable") +} + +func lexRuleSetValueCloseQuote(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalQuote) { + l.position += len(literalQuote) + l.emit(itemQuote) + + return lexRuleSetSemicolon + } + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + default: + return l.errorf("unexpected character %+U %q", rune, rune) + + } + return lexRuleSetValueCloseQuote + +} + +func lexRuleSetSemicolon(l *lexer) lexFunction { + if strings.HasPrefix(l.input[l.position:], literalSemicolon) { + l.position += len(literalSemicolon) + l.emit(itemSemicolon) + return lexRulesetInside + } + + switch rune := l.next(); { + case isSpace(rune): + l.ignore() + default: + return l.errorf("unexpected character %+U %q", rune, rune) + } + return lexRuleSetSemicolon +} + +func (l *lexer) ignore() { + l.start = l.position +} + +func (l *lexer) errorf(format string, args ...interface{}) lexFunction { + l.elements <- element{itemError, fmt.Sprintf(format, args...)} + return nil +} + +func isSpace(rune rune) bool { + switch rune { + case ' ', '\t', '\n', '\r': + return true + } + return false +} + +func lexRulesetClose(l *lexer) lexFunction { + l.position += len(literalCloseBracket) + l.emit(itemCloseBracket) + + return lexBody +} + +func (l *lexer) emit(i itemType) { + l.elements <- element{i, l.input[l.start:l.position]} + l.start = l.position +} + +type lexFunction func(*lexer) lexFunction + +func main() { + in := `{ + set evaluation_interval = "10m"; + + rules { + } + + + set name = "your mom"; + + + } + { + set evaluation_interval = "30m"; + }` + fmt.Println(in) + _, v := lex("", in) + for value := range v { + fmt.Println(value) + } +} diff --git a/configuration/grammar/lex_test.goold b/configuration/grammar/lex_test.goold new file mode 100644 index 000000000..c2b1ac4e6 --- /dev/null +++ b/configuration/grammar/lex_test.goold @@ -0,0 +1,192 @@ +package main + +import ( + "testing" +) + +type lexTest struct { + name string + input string + elements []element +} + +var ( + tEof = element{itemEof, ""} + tOpenRuleset = element{itemOpenBracket, "{"} + tCloseBracket = element{itemCloseBracket, "}"} + tIllegal = element{itemError, "illegal input"} + tSet = element{itemSetKeyword, "set"} + tEqual = element{itemEqual, "="} + tQuote = element{itemQuote, `"`} + tSemicolon = element{itemSemicolon, ";"} + tRules = element{itemRulesKeyword, "rules"} +) + +var lexTests = []lexTest{ + { + "empty", + "", + []element{ + tEof, + }, + }, + { + "empty with new line", + "\n\n", + []element{ + tEof, + }, + }, + { + "one empty ruleset", + "{}", + []element{ + tOpenRuleset, + tCloseBracket, + tEof, + }, + }, + { + "one empty ruleset distributed over new line", + "{\n}", + []element{ + tOpenRuleset, + tCloseBracket, + tEof, + }, + }, + { + "two empty rulesets", + "{} {}", + []element{ + tOpenRuleset, + tCloseBracket, + tOpenRuleset, + tCloseBracket, + tEof, + }, + }, + { + "two empty rulesets distributed over new line", + "{}\n{}", + []element{ + tOpenRuleset, + tCloseBracket, + tOpenRuleset, + tCloseBracket, + tEof, + }, + }, + { + "garbage", + "garbage", + []element{tIllegal}, + }, + { + "one set", + `{ set foo = "bar"; }`, + []element{ + tOpenRuleset, + tSet, + element{itemKey, "foo"}, + tEqual, + tQuote, + element{itemValue, "bar"}, + tQuote, + tSemicolon, + tCloseBracket, + tEof, + }, + }, + { + "one set over multiple lines", + `{ + set + foo + = + "bar" + ; + }`, + []element{tOpenRuleset, tSet, element{itemKey, "foo"}, tEqual, tQuote, element{itemValue, "bar"}, tQuote, tSemicolon, tCloseBracket, tEof}, + }, + { + "two sets", + `{ set foo = "bar";set baz = "qux"; }`, + []element{ + tOpenRuleset, + tSet, + element{itemKey, "foo"}, + tEqual, + tQuote, + element{itemValue, "bar"}, + tQuote, + tSemicolon, + tSet, + element{itemKey, "baz"}, + tEqual, + tQuote, + element{itemValue, "qux"}, + tQuote, + tSemicolon, + tCloseBracket, + tEof, + }, + }, + { + "two over multiple lines", + `{ set foo = "bar"; + set +baz += +"qux" +; +}`, + []element{ + tOpenRuleset, + tSet, + element{itemKey, "foo"}, + tEqual, + tQuote, + element{itemValue, "bar"}, + tQuote, + tSemicolon, + tSet, + element{itemKey, "baz"}, + tEqual, + tQuote, + element{itemValue, "qux"}, + tQuote, + tSemicolon, + tCloseBracket, + tEof, + }, + }, +} + +func collect(l *lexTest) []element { + _, v := lex("", l.input) + + emission := make([]element, 0) + + for i := range v { + emission = append(emission, i) + } + + return emission +} + +func TestFoo(t *testing.T) { + for _, test := range lexTests { + e := collect(&test) + + if len(e) != len(test.elements) { + t.Errorf("%s: got\n\n\t%v\nexpected\n\n\t%v", test.name, e, test.elements) + } + + for i, _ := range test.elements { + if test.elements[i] != e[i] { + t.Errorf("%s[%d]: got\n\n\t%v\nexpected\n\n\t%v", test.name, i, e[i], test.elements[i]) + } + } + } +} diff --git a/encoder.go b/encoder.go new file mode 100644 index 000000000..96824f179 --- /dev/null +++ b/encoder.go @@ -0,0 +1,5 @@ +package main + +type Encoder interface { + Encode() ([]byte, error) +} diff --git a/index.go b/index.go new file mode 100644 index 000000000..e8d7144e8 --- /dev/null +++ b/index.go @@ -0,0 +1,8 @@ +package main + +type MembershipIndex interface { + Has(key Encoder) (bool, error) + Put(key Encoder) error + Drop(key Encoder) error + Close() error +} diff --git a/levigo_index.go b/levigo_index.go new file mode 100644 index 000000000..6fe8a4da1 --- /dev/null +++ b/levigo_index.go @@ -0,0 +1,43 @@ +package main + +import ( + data "github.com/matttproud/prometheus/model/generated" +) + +type LevigoMembershipIndex struct { + persistence *LevigoPersistence + existenceValue Encoder +} + +func (l *LevigoMembershipIndex) Close() error { + return l.persistence.Close() +} + +func (l *LevigoMembershipIndex) Has(key Encoder) (bool, error) { + return l.persistence.Has(key) +} + +func (l *LevigoMembershipIndex) Drop(key Encoder) error { + return l.persistence.Drop(key) +} + +func (l *LevigoMembershipIndex) Put(key Encoder) error { + return l.persistence.Put(key, l.existenceValue) +} + +func NewLevigoMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoMembershipIndex, error) { + var levigoPersistence *LevigoPersistence + var levigoPersistenceError error + + existenceValue := NewProtocolBufferEncoder(&data.MembershipIndexValueDDO{}) + + if levigoPersistence, levigoPersistenceError = NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil { + levigoMembershipIndex := &LevigoMembershipIndex{ + persistence: levigoPersistence, + existenceValue: existenceValue, + } + return levigoMembershipIndex, nil + } + + return nil, levigoPersistenceError +} diff --git a/levigo_metric_persistence.go b/levigo_metric_persistence.go new file mode 100644 index 000000000..d299cb68b --- /dev/null +++ b/levigo_metric_persistence.go @@ -0,0 +1,716 @@ +package main + +import ( + "code.google.com/p/goprotobuf/proto" + "errors" + "fmt" + "github.com/matttproud/prometheus/coding/indexable" + data "github.com/matttproud/prometheus/model/generated" + "github.com/matttproud/prometheus/utility" + "log" + "sort" +) + +type pendingArchival map[int64]float64 + +type LevigoMetricPersistence struct { + fingerprintHighWaterMarks *LevigoPersistence + fingerprintLabelPairs *LevigoPersistence + fingerprintLowWaterMarks *LevigoPersistence + fingerprintSamples *LevigoPersistence + labelNameFingerprints *LevigoPersistence + labelPairFingerprints *LevigoPersistence + metricMembershipIndex *LevigoMembershipIndex +} + +func (l *LevigoMetricPersistence) Close() error { + log.Printf("Closing LevigoPersistence storage containers...") + + var persistences = []struct { + name string + closer LevigoCloser + }{ + { + "Fingerprint High-Water Marks", + l.fingerprintHighWaterMarks, + }, + { + "Fingerprint to Label Name and Value Pairs", + l.fingerprintLabelPairs, + }, + { + "Fingerprint Low-Water Marks", + l.fingerprintLowWaterMarks, + }, + { + "Fingerprint Samples", + l.fingerprintSamples, + }, + { + "Label Name to Fingerprints", + l.labelNameFingerprints, + }, + { + "Label Name and Value Pairs to Fingerprints", + l.labelPairFingerprints, + }, + { + "Metric Membership Index", + l.metricMembershipIndex, + }, + } + + errorChannel := make(chan error, len(persistences)) + + for _, persistence := range persistences { + name := persistence.name + closer := persistence.closer + + if closer != nil { + log.Printf("Closing LevigoPersistence storage container: %s\n", name) + closingError := closer.Close() + + if closingError != nil { + log.Printf("Could not close a LevigoPersistence storage container; inconsistencies are possible: %q\n", closingError) + } + + errorChannel <- closingError + } else { + errorChannel <- nil + } + } + + for i := 0; i < cap(errorChannel); i++ { + closingError := <-errorChannel + + if closingError != nil { + return closingError + } + } + + log.Printf("Successfully closed all LevigoPersistence storage containers.") + + return nil +} + +type levigoOpener func() + +func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) { + log.Printf("Opening LevigoPersistence storage containers...") + + errorChannel := make(chan error, 7) + + emission := &LevigoMetricPersistence{} + + var subsystemOpeners = []struct { + name string + opener levigoOpener + }{ + { + "High-Water Marks by Fingerprint", + func() { + var anomaly error + emission.fingerprintHighWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Label Names and Value Pairs by Fingerprint", + func() { + var anomaly error + emission.fingerprintLabelPairs, anomaly = NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Low-Water Marks by Fingerprint", + func() { + var anomaly error + emission.fingerprintLowWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Samples by Fingerprint", + func() { + var anomaly error + emission.fingerprintSamples, anomaly = NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Fingerprints by Label Name", + func() { + var anomaly error + emission.labelNameFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Fingerprints by Label Name and Value Pair", + func() { + var anomaly error + emission.labelPairFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) + errorChannel <- anomaly + }, + }, + { + "Metric Membership Index", + func() { + var anomaly error + emission.metricMembershipIndex, anomaly = NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) + errorChannel <- anomaly + }, + }, + } + + for _, subsystem := range subsystemOpeners { + name := subsystem.name + opener := subsystem.opener + + log.Printf("Opening LevigoPersistence storage container: %s\n", name) + + go opener() + } + + for i := 0; i < cap(errorChannel); i++ { + openingError := <-errorChannel + + if openingError != nil { + + log.Printf("Could not open a LevigoPersistence storage container: %q\n", openingError) + + return nil, openingError + } + } + + log.Printf("Successfully opened all LevigoPersistence storage containers.\n") + + return emission, nil +} + +func ddoFromSample(sample *Sample) *data.MetricDDO { + labelNames := make([]string, 0, len(sample.Labels)) + + for labelName, _ := range sample.Labels { + labelNames = append(labelNames, string(labelName)) + } + + sort.Strings(labelNames) + + labelPairs := make([]*data.LabelPairDDO, 0, len(sample.Labels)) + + for _, labelName := range labelNames { + labelValue := sample.Labels[labelName] + labelPair := &data.LabelPairDDO{ + Name: proto.String(string(labelName)), + Value: proto.String(string(labelValue)), + } + + labelPairs = append(labelPairs, labelPair) + } + + metricDDO := &data.MetricDDO{ + LabelPair: labelPairs, + } + + return metricDDO +} + +func ddoFromMetric(metric Metric) *data.MetricDDO { + labelNames := make([]string, 0, len(metric)) + + for labelName, _ := range metric { + labelNames = append(labelNames, string(labelName)) + } + + sort.Strings(labelNames) + + labelPairs := make([]*data.LabelPairDDO, 0, len(metric)) + + for _, labelName := range labelNames { + labelValue := metric[labelName] + labelPair := &data.LabelPairDDO{ + Name: proto.String(string(labelName)), + Value: proto.String(string(labelValue)), + } + + labelPairs = append(labelPairs, labelPair) + } + + metricDDO := &data.MetricDDO{ + LabelPair: labelPairs, + } + + return metricDDO +} + +func fingerprintDDOFromByteArray(fingerprint []byte) *data.FingerprintDDO { + fingerprintDDO := &data.FingerprintDDO{ + Signature: proto.String(string(fingerprint)), + } + + return fingerprintDDO +} + +func (l *LevigoMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) { + ddoKey := NewProtocolBufferEncoder(ddo) + return l.metricMembershipIndex.Has(ddoKey) +} + +func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error { + ddoKey := NewProtocolBufferEncoder(ddo) + return l.metricMembershipIndex.Put(ddoKey) +} + +func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) { + if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { + fingerprint := FingerprintFromByteArray(messageByteArray) + return &data.FingerprintDDO{ + Signature: proto.String(string(fingerprint)), + }, nil + } else { + return nil, marshalError + } + + return nil, errors.New("Unknown error in generating FingerprintDDO from message.") +} + +func (l *LevigoMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) { + ddoKey := NewProtocolBufferEncoder(ddo) + return l.labelPairFingerprints.Has(ddoKey) +} + +func (l *LevigoMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) { + ddoKey := NewProtocolBufferEncoder(ddo) + return l.labelNameFingerprints.Has(ddoKey) +} + +func (l *LevigoMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) { + ddoKey := NewProtocolBufferEncoder(ddo) + if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil { + value := &data.FingerprintCollectionDDO{} + if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { + return value, nil + } else { + return nil, unmarshalError + } + } else { + return nil, getError + } + return nil, errors.New("Unknown error while getting label name and value pair fingerprints.") +} + +func (l *LevigoMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) { + ddoKey := NewProtocolBufferEncoder(ddo) + if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil { + value := &data.FingerprintCollectionDDO{} + if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { + return value, nil + } else { + return nil, unmarshalError + } + } else { + return nil, getError + } + + return nil, errors.New("Unknown error while getting label name fingerprints.") +} + +func (l *LevigoMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error { + labelPairEncoded := NewProtocolBufferEncoder(labelPair) + fingerprintsEncoded := NewProtocolBufferEncoder(fingerprints) + return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded) +} + +func (l *LevigoMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error { + labelNameEncoded := NewProtocolBufferEncoder(labelName) + fingerprintsEncoded := NewProtocolBufferEncoder(fingerprints) + return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded) +} + +func (l *LevigoMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { + if has, hasError := l.HasLabelPair(labelPair); hasError == nil { + var fingerprints *data.FingerprintCollectionDDO + if has { + if existing, existingError := l.GetLabelPairFingerprints(labelPair); existingError == nil { + fingerprints = existing + } else { + return existingError + } + } else { + fingerprints = &data.FingerprintCollectionDDO{} + } + + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelPairFingerprints(labelPair, fingerprints) + } else { + return hasError + } + + return errors.New("Unknown error when appending fingerprint to label name and value pair.") +} + +func (l *LevigoMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { + labelName := &data.LabelNameDDO{ + Name: labelPair.Name, + } + + if has, hasError := l.HasLabelName(labelName); hasError == nil { + var fingerprints *data.FingerprintCollectionDDO + if has { + if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { + fingerprints = existing + } else { + return existingError + } + } else { + fingerprints = &data.FingerprintCollectionDDO{} + } + + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelNameFingerprints(labelName, fingerprints) + } else { + return hasError + } + + return errors.New("Unknown error when appending fingerprint to label name and value pair.") +} + +func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error { + if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil { + labelPairCollectionDDO := &data.LabelPairCollectionDDO{ + Member: ddo.LabelPair, + } + fingerprintKey := NewProtocolBufferEncoder(fingerprintDDO) + labelPairCollectionDDOEncoder := NewProtocolBufferEncoder(labelPairCollectionDDO) + + if putError := l.fingerprintLabelPairs.Put(fingerprintKey, labelPairCollectionDDOEncoder); putError == nil { + labelCount := len(ddo.LabelPair) + labelPairErrors := make(chan error, labelCount) + labelNameErrors := make(chan error, labelCount) + + for _, labelPair := range ddo.LabelPair { + go func(labelPair *data.LabelPairDDO) { + labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDDO) + }(labelPair) + + go func(labelPair *data.LabelPairDDO) { + labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDDO) + }(labelPair) + } + + for i := 0; i < cap(labelPairErrors); i++ { + appendError := <-labelPairErrors + + if appendError != nil { + return appendError + } + } + + for i := 0; i < cap(labelNameErrors); i++ { + appendError := <-labelNameErrors + + if appendError != nil { + return appendError + } + } + + return nil + + } else { + return putError + } + } else { + return fingerprintDDOError + } + + return errors.New("Unknown error in appending label pairs to fingerprint.") +} + +func (l *LevigoMetricPersistence) AppendSample(sample *Sample) error { + fmt.Printf("Sample: %q\n", sample) + + metricDDO := ddoFromSample(sample) + + if indexHas, indexHasError := l.hasIndexMetric(metricDDO); indexHasError == nil { + if !indexHas { + if indexPutError := l.indexMetric(metricDDO); indexPutError == nil { + if appendError := l.appendFingerprints(metricDDO); appendError != nil { + log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) + return appendError + } + } else { + log.Printf("Could not add metric to membership index: %q\n", indexPutError) + return indexPutError + } + } + } else { + log.Printf("Could not query membership index for metric: %q\n", indexHasError) + return indexHasError + } + + if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { + + sampleKeyDDO := &data.SampleKeyDDO{ + Fingerprint: fingerprintDDO, + Timestamp: indexable.EncodeTime(sample.Timestamp), + } + + sampleValueDDO := &data.SampleValueDDO{ + Value: proto.Float32(float32(sample.Value)), + } + + sampleKeyEncoded := NewProtocolBufferEncoder(sampleKeyDDO) + sampleValueEncoded := NewProtocolBufferEncoder(sampleValueDDO) + + if putError := l.fingerprintSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { + log.Printf("Could not append metric sample: %q\n", putError) + return putError + } + } else { + log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDDOErr) + return fingerprintDDOErr + } + + return nil +} + +func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) { + if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil { + result := make([]string, 0, len(getAll)) + labelNameDDO := &data.LabelNameDDO{} + + for _, pair := range getAll { + if unmarshalError := proto.Unmarshal(pair.Left, labelNameDDO); unmarshalError == nil { + result = append(result, *labelNameDDO.Name) + } else { + return nil, unmarshalError + } + } + + return result, nil + + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying label names.") +} + +func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) { + if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { + result := make([]LabelPairs, 0, len(getAll)) + labelPairDDO := &data.LabelPairDDO{} + + for _, pair := range getAll { + if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil { + item := LabelPairs{ + *labelPairDDO.Name: *labelPairDDO.Value, + } + result = append(result, item) + } else { + return nil, unmarshalError + } + } + + return result, nil + + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying label pairs.") +} + +func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) { + log.Printf("GetMetrics()\n") + + if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { + log.Printf("getAll: %q\n", getAll) + result := make([]LabelPairs, 0) + fingerprintCollection := &data.FingerprintCollectionDDO{} + + fingerprints := make(utility.Set) + + for _, pair := range getAll { + log.Printf("pair: %q\n", pair) + if unmarshalError := proto.Unmarshal(pair.Right, fingerprintCollection); unmarshalError == nil { + for _, member := range fingerprintCollection.Member { + log.Printf("member: %q\n", member) + if !fingerprints.Has(*member.Signature) { + log.Printf("!Has: %q\n", member.Signature) + fingerprints.Add(*member.Signature) + log.Printf("fingerprints %q\n", fingerprints) + fingerprintEncoded := NewProtocolBufferEncoder(member) + if labelPairCollectionRaw, labelPairCollectionRawError := l.fingerprintLabelPairs.Get(fingerprintEncoded); labelPairCollectionRawError == nil { + log.Printf("labelPairCollectionRaw: %q\n", labelPairCollectionRaw) + + labelPairCollectionDDO := &data.LabelPairCollectionDDO{} + + if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil { + intermediate := make(LabelPairs, 0) + + for _, member := range labelPairCollectionDDO.Member { + intermediate[*member.Name] = *member.Value + } + + log.Printf("intermediate: %q\n", intermediate) + + result = append(result, intermediate) + } else { + return nil, labelPairCollectionDDOMarshalError + } + } else { + return nil, labelPairCollectionRawError + } + } + } + } else { + return nil, unmarshalError + } + } + return result, nil + } else { + return nil, getAllError + } + + return nil, errors.New("Unknown error encountered when querying metrics.") +} + +func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interval, int, error) { + metricDDO := ddoFromMetric(metric) + + if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { + if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil { + defer closer.Close() + + start := &data.SampleKeyDDO{ + Fingerprint: fingerprintDDO, + Timestamp: indexable.EarliestTime, + } + + if encode, encodeErr := NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { + iterator.Seek(encode) + + if iterator.Valid() { + found := &data.SampleKeyDDO{} + if unmarshalErr := proto.Unmarshal(iterator.Key(), found); unmarshalErr == nil { + var foundEntries int = 0 + + if *fingerprintDDO.Signature == *found.Fingerprint.Signature { + emission := &Interval{ + OldestInclusive: indexable.DecodeTime(found.Timestamp), + NewestInclusive: indexable.DecodeTime(found.Timestamp), + } + + for iterator = iterator; iterator.Valid(); iterator.Next() { + if subsequentUnmarshalErr := proto.Unmarshal(iterator.Key(), found); subsequentUnmarshalErr == nil { + if *fingerprintDDO.Signature != *found.Fingerprint.Signature { + return emission, foundEntries, nil + } + foundEntries++ + log.Printf("b foundEntries++ %d\n", foundEntries) + emission.NewestInclusive = indexable.DecodeTime(found.Timestamp) + } else { + log.Printf("Could not de-serialize subsequent key: %q\n", subsequentUnmarshalErr) + return nil, -7, subsequentUnmarshalErr + } + } + return emission, foundEntries, nil + } else { + return &Interval{}, -6, nil + } + } else { + log.Printf("Could not de-serialize start key: %q\n", unmarshalErr) + return nil, -5, unmarshalErr + } + } else { + iteratorErr := iterator.GetError() + log.Printf("Could not seek for metric watermark beginning: %q\n", iteratorErr) + return nil, -4, iteratorErr + } + } else { + log.Printf("Could not seek for metric watermark: %q\n", encodeErr) + return nil, -3, encodeErr + } + } else { + if closer != nil { + defer closer.Close() + } + + log.Printf("Could not provision iterator for metric: %q\n", iteratorErr) + return nil, -3, iteratorErr + } + } else { + log.Printf("Could not encode metric: %q\n", fingerprintDDOErr) + return nil, -2, fingerprintDDOErr + } + + return nil, -1, errors.New("Unknown error occurred while querying metric watermarks.") +} + +// TODO(mtp): Holes in the data! + +func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval Interval) ([]Samples, error) { + metricDDO := ddoFromMetric(metric) + + if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { + if iterator, closer, iteratorErr := l.fingerprintSamples.GetIterator(); iteratorErr == nil { + defer closer.Close() + + start := &data.SampleKeyDDO{ + Fingerprint: fingerprintDDO, + Timestamp: indexable.EncodeTime(interval.OldestInclusive), + } + + emission := make([]Samples, 0) + + if encode, encodeErr := NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { + iterator.Seek(encode) + + for iterator = iterator; iterator.Valid(); iterator.Next() { + key := &data.SampleKeyDDO{} + value := &data.SampleValueDDO{} + if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { + if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { + if *fingerprintDDO.Signature == *key.Fingerprint.Signature { + // Wart + if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { + emission = append(emission, Samples{ + Value: SampleValue(*value.Value), + Timestamp: indexable.DecodeTime(key.Timestamp), + }) + } else { + break + } + } else { + break + } + } else { + return nil, valueUnmarshalErr + } + } else { + return nil, keyUnmarshalErr + } + } + + return emission, nil + + } else { + log.Printf("Could not encode the start key: %q\n", encodeErr) + return nil, encodeErr + } + } else { + log.Printf("Could not acquire iterator: %q\n", iteratorErr) + return nil, iteratorErr + } + } else { + log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDDOErr) + return nil, fingerprintDDOErr + } + + return nil, errors.New("Unknown error occurred while querying metric watermarks.") +} diff --git a/levigo_metric_persistence_test.go b/levigo_metric_persistence_test.go new file mode 100644 index 000000000..b54a4caec --- /dev/null +++ b/levigo_metric_persistence_test.go @@ -0,0 +1,590 @@ +package main + +import ( + "code.google.com/p/goprotobuf/proto" + "fmt" + data "github.com/matttproud/prometheus/model/generated" + "io/ioutil" + "math" + "math/rand" + "os" + "testing" + "testing/quick" + "time" +) + +func TestBasicLifecycle(t *testing.T) { + temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "levigo_metric_persistence_test") + + if temporaryDirectoryErr != nil { + t.Errorf("Could not create test directory: %q\n", temporaryDirectoryErr) + return + } + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, openErr := NewLevigoMetricPersistence(temporaryDirectory) + + if openErr != nil { + t.Errorf("Could not create Levigo Metric Persistence: %q\n", openErr) + } + + if persistence == nil { + t.Errorf("Received nil Levigo Metric Persistence.\n") + return + } + + closeErr := persistence.Close() + + if closeErr != nil { + t.Errorf("Could not close Levigo Metric Persistence: %q\n", closeErr) + } +} + +func TestReadEmpty(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevigoMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + hasLabelPair := func(x int) bool { + name := string(x) + value := string(x) + + ddo := &data.LabelPairDDO{ + Name: proto.String(name), + Value: proto.String(value), + } + + has, hasErr := persistence.HasLabelPair(ddo) + + if hasErr != nil { + return false + } + + return has == false + } + + if hasPairErr := quick.Check(hasLabelPair, nil); hasPairErr != nil { + t.Error(hasPairErr) + } + hasLabelName := func(x int) bool { + name := string(x) + + ddo := &data.LabelNameDDO{ + Name: proto.String(name), + } + + has, hasErr := persistence.HasLabelName(ddo) + + if hasErr != nil { + return false + } + + return has == false + } + + if hasNameErr := quick.Check(hasLabelName, nil); hasNameErr != nil { + t.Error(hasNameErr) + } + + getLabelPairFingerprints := func(x int) bool { + name := string(x) + value := string(x) + + ddo := &data.LabelPairDDO{ + Name: proto.String(name), + Value: proto.String(value), + } + + fingerprints, fingerprintsErr := persistence.GetLabelPairFingerprints(ddo) + + if fingerprintsErr != nil { + return false + } + + if fingerprints == nil { + return false + } + + return len(fingerprints.Member) == 0 + } + + if labelPairFingerprintsErr := quick.Check(getLabelPairFingerprints, nil); labelPairFingerprintsErr != nil { + t.Error(labelPairFingerprintsErr) + } + + getLabelNameFingerprints := func(x int) bool { + name := string(x) + + ddo := &data.LabelNameDDO{ + Name: proto.String(name), + } + + fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(ddo) + + if fingerprintsErr != nil { + return false + } + + if fingerprints == nil { + return false + } + + return len(fingerprints.Member) == 0 + } + + if labelNameFingerprintsErr := quick.Check(getLabelNameFingerprints, nil); labelNameFingerprintsErr != nil { + t.Error(labelNameFingerprintsErr) + } +} + +func TestAppendSampleAsPureSparseAppend(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevigoMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendSample := func(x int) bool { + sample := &Sample{ + Value: SampleValue(float32(x)), + Timestamp: time.Unix(int64(x), int64(x)), + Labels: LabelPairs{string(x): string(x)}, + } + + appendErr := persistence.AppendSample(sample) + + return appendErr == nil + } + + if appendErr := quick.Check(appendSample, nil); appendErr != nil { + t.Error(appendErr) + } +} + +func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevigoMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendSample := func(x int) bool { + sample := &Sample{ + Value: SampleValue(float32(x)), + Timestamp: time.Unix(int64(x), int64(x)), + Labels: LabelPairs{string(x): string(x)}, + } + + appendErr := persistence.AppendSample(sample) + + if appendErr != nil { + return false + } + + labelNameDDO := &data.LabelNameDDO{ + Name: proto.String(string(x)), + } + + hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelNameDDO) + + if hasLabelNameErr != nil { + return false + } + + if !hasLabelName { + return false + } + + labelPairDDO := &data.LabelPairDDO{ + Name: proto.String(string(x)), + Value: proto.String(string(x)), + } + + hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPairDDO) + + if hasLabelPairErr != nil { + return false + } + + if !hasLabelPair { + return false + } + + labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelNameDDO) + + if labelNameFingerprintsErr != nil { + return false + } + + if labelNameFingerprints == nil { + return false + } + + if len(labelNameFingerprints.Member) != 1 { + return false + } + + labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPairDDO) + + if labelPairFingerprintsErr != nil { + return false + } + + if labelPairFingerprints == nil { + return false + } + + if len(labelPairFingerprints.Member) != 1 { + return false + } + + return true + } + + if appendErr := quick.Check(appendSample, nil); appendErr != nil { + t.Error(appendErr) + } +} + +func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevigoMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + appendSample := func(x int) bool { + sample := &Sample{ + Value: SampleValue(float32(x)), + Timestamp: time.Unix(int64(x), 0), + Labels: LabelPairs{"name": "my_metric"}, + } + + appendErr := persistence.AppendSample(sample) + + return appendErr == nil + } + + if appendErr := quick.Check(appendSample, nil); appendErr != nil { + t.Error(appendErr) + } +} + +func TestStochastic(t *testing.T) { + stochastic := func(x int) bool { + seed := rand.NewSource(int64(x)) + random := rand.New(seed) + numberOfMetrics := random.Intn(5) + 1 + numberOfSharedLabels := random.Intn(5) + numberOfUnsharedLabels := random.Intn(5) + numberOfSamples := random.Intn(1024) + 2 + numberOfRangeScans := random.Intn(3) + + temporaryDirectory, _ := ioutil.TempDir("", "levigo_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevigoMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + metricTimestamps := make(map[int]map[int64]bool) + metricEarliestSample := make(map[int]int64) + metricNewestSample := make(map[int]int64) + + for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { + sample := &Sample{ + Labels: LabelPairs{}, + } + + sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex) + + for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { + sample.Labels[fmt.Sprintf("shared_label_%d", sharedLabelIndex)] = fmt.Sprintf("label_%d", sharedLabelIndex) + } + + for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { + sample.Labels[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)] = fmt.Sprintf("private_label_%d", unsharedLabelIndex) + } + + timestamps := make(map[int64]bool) + metricTimestamps[metricIndex] = timestamps + var newestSample int64 = math.MinInt64 + var oldestSample int64 = math.MaxInt64 + var nextTimestamp func() int64 + + nextTimestamp = func() int64 { + var candidate int64 + candidate = random.Int63n(math.MaxInt32 - 1) + + if _, has := timestamps[candidate]; has { + candidate = nextTimestamp() + } + + timestamps[candidate] = true + + if candidate < oldestSample { + oldestSample = candidate + } + + if candidate > newestSample { + newestSample = candidate + } + + return candidate + } + + for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ { + sample.Timestamp = time.Unix(nextTimestamp(), 0) + sample.Value = SampleValue(sampleIndex) + + appendErr := persistence.AppendSample(sample) + + if appendErr != nil { + return false + } + } + + metricEarliestSample[metricIndex] = oldestSample + metricNewestSample[metricIndex] = newestSample + + for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ { + labelPair := &data.LabelPairDDO{ + Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), + Value: proto.String(fmt.Sprintf("label_%d", sharedLabelIndex)), + } + + hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair) + + if hasLabelPairErr != nil { + return false + } + + if hasLabelPair != true { + return false + } + + labelName := &data.LabelNameDDO{ + Name: proto.String(fmt.Sprintf("shared_label_%d", sharedLabelIndex)), + } + + hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName) + + if hasLabelNameErr != nil { + return false + } + + if hasLabelName != true { + return false + } + } + } + + for sharedIndex := 0; sharedIndex < numberOfSharedLabels; sharedIndex++ { + labelName := &data.LabelNameDDO{ + Name: proto.String(fmt.Sprintf("shared_label_%d", sharedIndex)), + } + fingerprints, fingerprintsErr := persistence.GetLabelNameFingerprints(labelName) + + if fingerprintsErr != nil { + return false + } + + if fingerprints == nil { + return false + } + + if len(fingerprints.Member) != numberOfMetrics { + return false + } + } + + for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { + for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ { + labelPair := &data.LabelPairDDO{ + Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), + Value: proto.String(fmt.Sprintf("private_label_%d", unsharedLabelIndex)), + } + + hasLabelPair, hasLabelPairErr := persistence.HasLabelPair(labelPair) + + if hasLabelPairErr != nil { + return false + } + + if hasLabelPair != true { + return false + } + + labelPairFingerprints, labelPairFingerprintsErr := persistence.GetLabelPairFingerprints(labelPair) + + if labelPairFingerprintsErr != nil { + return false + } + + if labelPairFingerprints == nil { + return false + } + + if len(labelPairFingerprints.Member) != 1 { + return false + } + + labelName := &data.LabelNameDDO{ + Name: proto.String(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex)), + } + + hasLabelName, hasLabelNameErr := persistence.HasLabelName(labelName) + + if hasLabelNameErr != nil { + return false + } + + if hasLabelName != true { + return false + } + + labelNameFingerprints, labelNameFingerprintsErr := persistence.GetLabelNameFingerprints(labelName) + + if labelNameFingerprintsErr != nil { + return false + } + + if labelNameFingerprints == nil { + return false + } + + if len(labelNameFingerprints.Member) != 1 { + return false + } + } + + metric := make(Metric) + + metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex) + + for i := 0; i < numberOfSharedLabels; i++ { + metric[fmt.Sprintf("shared_label_%d", i)] = fmt.Sprintf("label_%d", i) + } + + for i := 0; i < numberOfUnsharedLabels; i++ { + metric[fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i)] = fmt.Sprintf("private_label_%d", i) + } + + watermarks, count, watermarksErr := persistence.GetWatermarksForMetric(metric) + + if watermarksErr != nil { + return false + } + + if watermarks == nil { + return false + } + + if count != numberOfSamples { + return false + } + + minimum := metricEarliestSample[metricIndex] + maximum := metricNewestSample[metricIndex] + spread := maximum - minimum + + for i := 0; i < numberOfRangeScans; i++ { + timestamps := metricTimestamps[metricIndex] + + var first int64 = 0 + var second int64 = 0 + + for { + first = minimum + random.Int63n(spread) + if _, has := timestamps[first]; has { + break + } + } + + for { + second = minimum + random.Int63n(spread) + if _, has := timestamps[second]; has && second != first { + break + } + } + + var begin int64 = 0 + var end int64 = 0 + + if first > second { + begin = second + end = first + } else { + begin = first + end = second + } + + interval := Interval{ + OldestInclusive: time.Unix(begin, 0), + NewestInclusive: time.Unix(end, 0), + } + rangeValues, rangeErr := persistence.GetSamplesForMetric(metric, interval) + + if rangeErr != nil { + return false + } + + if len(rangeValues) < 2 { + return false + } + } + } + + return true + } + + if stochasticError := quick.Check(stochastic, nil); stochasticError != nil { + t.Error(stochasticError) + } +} diff --git a/levigo_persistence.go b/levigo_persistence.go new file mode 100644 index 000000000..99eaae1f1 --- /dev/null +++ b/levigo_persistence.go @@ -0,0 +1,216 @@ +package main + +import ( + "github.com/jmhodges/levigo" + "io" +) + +type LevigoCloser interface { + Close() error +} + +type LevigoPersistence struct { + cache *levigo.Cache + filterPolicy *levigo.FilterPolicy + options *levigo.Options + storage *levigo.DB + readOptions *levigo.ReadOptions + writeOptions *levigo.WriteOptions +} + +func NewLevigoPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoPersistence, error) { + options := levigo.NewOptions() + options.SetCreateIfMissing(true) + options.SetParanoidChecks(true) + + cache := levigo.NewLRUCache(cacheCapacity) + options.SetCache(cache) + + filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded) + options.SetFilterPolicy(filterPolicy) + + storage, openErr := levigo.Open(storageRoot, options) + + readOptions := levigo.NewReadOptions() + writeOptions := levigo.NewWriteOptions() + writeOptions.SetSync(true) + + emission := &LevigoPersistence{ + cache: cache, + filterPolicy: filterPolicy, + options: options, + readOptions: readOptions, + storage: storage, + writeOptions: writeOptions, + } + + return emission, openErr +} + +func (l *LevigoPersistence) Close() error { + if l.storage != nil { + l.storage.Close() + } + + defer func() { + if l.filterPolicy != nil { + l.filterPolicy.Close() + } + }() + + defer func() { + if l.cache != nil { + l.cache.Close() + } + }() + + defer func() { + if l.options != nil { + l.options.Close() + } + }() + + defer func() { + if l.readOptions != nil { + l.readOptions.Close() + } + }() + + defer func() { + if l.writeOptions != nil { + l.writeOptions.Close() + } + }() + + return nil +} + +func (l *LevigoPersistence) Get(value Encoder) ([]byte, error) { + var key []byte + var keyError error + + if key, keyError = value.Encode(); keyError == nil { + return l.storage.Get(l.readOptions, key) + } + + return nil, keyError +} + +func (l *LevigoPersistence) Has(value Encoder) (bool, error) { + if value, getError := l.Get(value); getError != nil { + return false, getError + } else if value == nil { + return false, nil + } + + return true, nil +} + +func (l *LevigoPersistence) Drop(value Encoder) error { + var key []byte + var keyError error + + if key, keyError = value.Encode(); keyError == nil { + + if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil { + return deleteError + } + + return nil + } + + return keyError +} + +func (l *LevigoPersistence) Put(key Encoder, value Encoder) error { + var keyEncoded []byte + var keyError error + + if keyEncoded, keyError = key.Encode(); keyError == nil { + if valueEncoded, valueError := value.Encode(); valueError == nil { + + if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil { + return putError + } + } else { + return valueError + } + + return nil + } + + return keyError +} + +func (l *LevigoPersistence) GetAll() ([]Pair, error) { + snapshot := l.storage.NewSnapshot() + defer l.storage.ReleaseSnapshot(snapshot) + readOptions := levigo.NewReadOptions() + defer readOptions.Close() + + readOptions.SetSnapshot(snapshot) + iterator := l.storage.NewIterator(readOptions) + defer iterator.Close() + iterator.SeekToFirst() + + result := make([]Pair, 0) + + for iterator := iterator; iterator.Valid(); iterator.Next() { + result = append(result, Pair{Left: iterator.Key(), Right: iterator.Value()}) + } + + iteratorError := iterator.GetError() + + if iteratorError != nil { + return nil, iteratorError + } + + return result, nil +} + +type iteratorCloser struct { + iterator *levigo.Iterator + readOptions *levigo.ReadOptions + snapshot *levigo.Snapshot + storage *levigo.DB +} + +func (i *iteratorCloser) Close() error { + defer func() { + if i.storage != nil { + if i.snapshot != nil { + i.storage.ReleaseSnapshot(i.snapshot) + } + } + }() + + defer func() { + if i.iterator != nil { + i.iterator.Close() + } + }() + + defer func() { + if i.readOptions != nil { + i.readOptions.Close() + } + }() + + return nil +} + +func (l *LevigoPersistence) GetIterator() (*levigo.Iterator, io.Closer, error) { + snapshot := l.storage.NewSnapshot() + readOptions := levigo.NewReadOptions() + readOptions.SetSnapshot(snapshot) + iterator := l.storage.NewIterator(readOptions) + + closer := &iteratorCloser{ + iterator: iterator, + readOptions: readOptions, + snapshot: snapshot, + storage: l.storage, + } + + return iterator, closer, nil +} diff --git a/main.go b/main.go new file mode 100644 index 000000000..a1d4dd0b8 --- /dev/null +++ b/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "code.google.com/p/gorest" + "net/http" +) + +func main() { + m, _ := NewLevigoMetricPersistence("/tmp/metrics") + s := &MetricsService{ + persistence: m, + } + gorest.RegisterService(s) + http.Handle("/", gorest.Handle()) + http.ListenAndServe(":8787", nil) +} diff --git a/metric.go b/metric.go new file mode 100644 index 000000000..0d8110c5a --- /dev/null +++ b/metric.go @@ -0,0 +1,43 @@ +package main + +import ( + "crypto/md5" + "encoding/hex" + "io" + "time" +) + +type Fingerprint string + +type LabelPairs map[string]string +type Metric map[string]string + +type SampleValue float32 + +type Sample struct { + Labels LabelPairs + Value SampleValue + Timestamp time.Time +} + +type Samples struct { + Value SampleValue + Timestamp time.Time +} + +type Interval struct { + OldestInclusive time.Time + NewestInclusive time.Time +} + +func FingerprintFromString(value string) Fingerprint { + hash := md5.New() + io.WriteString(hash, value) + return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) +} + +func FingerprintFromByteArray(value []byte) Fingerprint { + hash := md5.New() + hash.Write(value) + return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) +} diff --git a/metric_persistence.go b/metric_persistence.go new file mode 100644 index 000000000..ffc0668ae --- /dev/null +++ b/metric_persistence.go @@ -0,0 +1,15 @@ +package main + +type MetricPersistence interface { + Close() error + AppendSample(sample *Sample) error + + GetLabelNames() ([]string, error) + GetLabelPairs() ([]LabelPairs, error) + GetMetrics() ([]LabelPairs, error) + + GetMetricFingerprintsForLabelPairs(labelSets []*LabelPairs) ([]*Fingerprint, error) + RecordLabelNameFingerprint(sample *Sample) error + RecordFingerprintWatermark(sample *Sample) error + GetFingerprintLabelPairs(fingerprint Fingerprint) (LabelPairs, error) +} diff --git a/model/Makefile b/model/Makefile new file mode 100644 index 000000000..38e14cbe0 --- /dev/null +++ b/model/Makefile @@ -0,0 +1,10 @@ +export PATH := $(PATH):/Users/mtp/Development/go/bin + +all: data.proto + +data.proto: + protoc --go_out=generated/ data.proto +clean: + rm -rf generated/* + +.PHONY: data.proto diff --git a/model/data.proto b/model/data.proto new file mode 100644 index 000000000..cd8c2289e --- /dev/null +++ b/model/data.proto @@ -0,0 +1,86 @@ +package generated; + +message LabelPairDDO { + optional int64 version = 1 [default = 1]; + + optional string name = 2; + optional string value = 3; +} + +message MetricDDO { + optional int64 version = 1 [default = 1]; + + repeated LabelPairDDO label_pair = 2; +} + +message FingerprintCollectionDDO { + optional int64 version = 1 [default = 1]; + + repeated FingerprintDDO member = 2; +} + +message LabelPairCollectionDDO { + optional int64 version = 1 [default = 1]; + + repeated LabelPairDDO member = 2; +} + +message LabelNameDDO { + optional int64 version = 1 [default = 1]; + + optional string name = 2; +} + +message FingerprintDDO { + optional int64 version = 1 [default = 1]; + + // bytes + optional string signature = 2; +} + +message WatermarkDDO { + optional int64 version = 1 [default = 1]; + + optional int64 timestamp = 2; +} + +message SampleKeyDDO { + optional int64 version = 1 [default = 1]; + + optional FingerprintDDO fingerprint = 2; + optional bytes timestamp = 3; +} + +message SampleValueDDO { + optional int64 version = 1 [default = 1]; + + optional float value = 2; +} + + +// TOO OLD + +message MembershipIndexValueDDO { +} + + +message LabelNameAndValueIndexDDO { + optional string name = 1; + optional string value = 2; +} + +message LabelNameValuesDDO { + repeated string value = 1; +} + +message LabelNameAndValueToMetricDDO { + repeated string metric = 1; +} + +message MetricToWindowDDO { + repeated int64 window = 1; +} + +message WindowSampleDDO { + repeated float value = 1; +} diff --git a/persistence.go b/persistence.go new file mode 100644 index 000000000..b249ebb8d --- /dev/null +++ b/persistence.go @@ -0,0 +1,15 @@ +package main + +type Pair struct { + Left []byte + Right []byte +} + +type Persistence interface { + Has(key Encoder) (bool, error) + Get(key Encoder) ([]byte, error) + GetAll() ([]Pair, error) + Drop(key Encoder) error + Put(key Encoder, value Encoder) error + Close() error +} diff --git a/protocol_buffer_encoder.go b/protocol_buffer_encoder.go new file mode 100644 index 000000000..e9b7a3741 --- /dev/null +++ b/protocol_buffer_encoder.go @@ -0,0 +1,19 @@ +package main + +import ( + "code.google.com/p/goprotobuf/proto" +) + +type ProtocolBufferEncoder struct { + message proto.Message +} + +func (p *ProtocolBufferEncoder) Encode() ([]byte, error) { + return proto.Marshal(p.message) +} + +func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { + return &ProtocolBufferEncoder{ + message: message, + } +} diff --git a/service.go b/service.go new file mode 100644 index 000000000..fa268dc7a --- /dev/null +++ b/service.go @@ -0,0 +1,57 @@ +package main + +import ( + "code.google.com/p/gorest" +) + +type MetricsService struct { + gorest.RestService `root:"/" consumes:"application/json" produces:"application/json"` + + persistence *LevigoMetricPersistence + + listLabels gorest.EndPoint `method:"GET" path:"/labels/" output:"[]string"` + listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]LabelPairs"` + listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]LabelPairs"` + + appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"Sample"` +} + +func (m MetricsService) ListLabels() []string { + labels, labelsError := m.persistence.GetLabelNames() + + if labelsError != nil { + m.ResponseBuilder().SetResponseCode(500) + } + + return labels +} + +func (m MetricsService) ListLabelPairs() []LabelPairs { + labelPairs, labelPairsError := m.persistence.GetLabelPairs() + + if labelPairsError != nil { + m.ResponseBuilder().SetResponseCode(500) + } + + return labelPairs +} + +func (m MetricsService) ListMetrics() []LabelPairs { + metrics, metricsError := m.persistence.GetMetrics() + + if metricsError != nil { + m.ResponseBuilder().SetResponseCode(500) + } + + return metrics +} + +func (m MetricsService) AppendSample(s Sample) { + responseBuilder := m.ResponseBuilder() + if appendError := m.persistence.AppendSample(&s); appendError == nil { + responseBuilder.SetResponseCode(200) + return + } + + responseBuilder.SetResponseCode(500) +} diff --git a/utility/set.go b/utility/set.go new file mode 100644 index 000000000..8f7af725d --- /dev/null +++ b/utility/set.go @@ -0,0 +1,39 @@ +package utility + +type Set map[interface{}]bool + +func (s Set) Add(v interface{}) { + s[v] = true +} + +func (s Set) Remove(v interface{}) { + delete(s, v) +} + +func (s Set) Elements() []interface{} { + result := make([]interface{}, 0, len(s)) + + for k, _ := range s { + result = append(result, k) + } + + return result +} + +func (s Set) Has(v interface{}) bool { + _, p := s[v] + + return p +} + +func (s Set) Intersection(o Set) Set { + result := make(Set) + + for k, _ := range s { + if o.Has(k) { + result[k] = true + } + } + + return result +} diff --git a/utility/set_test.go b/utility/set_test.go new file mode 100644 index 000000000..188d06eef --- /dev/null +++ b/utility/set_test.go @@ -0,0 +1,109 @@ +package utility + +import ( + "testing" + "testing/quick" +) + +func TestSetEqualMemberships(t *testing.T) { + f := func(x int) bool { + first := make(Set) + second := make(Set) + + first.Add(x) + second.Add(x) + + intersection := first.Intersection(second) + + members := intersection.Elements() + + return members != nil && len(members) == 1 && members[0] == x + } + + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestSetInequalMemberships(t *testing.T) { + f := func(x int) bool { + first := make(Set) + second := make(Set) + + first.Add(x) + + intersection := first.Intersection(second) + + members := intersection.Elements() + + return members != nil && len(members) == 0 + } + + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestSetAsymmetricMemberships(t *testing.T) { + f := func(x int) bool { + first := make(Set) + second := make(Set) + + first.Add(x) + second.Add(x) + first.Add(x + 1) + second.Add(x + 1) + second.Add(x + 2) + first.Add(x + 2) + first.Add(x + 3) + second.Add(x + 4) + + intersection := first.Intersection(second) + + members := intersection.Elements() + + return members != nil && len(members) == 3 + } + + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestSetRemoval(t *testing.T) { + f := func(x int) bool { + first := make(Set) + + first.Add(x) + first.Remove(x) + + members := first.Elements() + + return members != nil && len(members) == 0 + } + + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestSetAdditionAndRemoval(t *testing.T) { + f := func(x int) bool { + first := make(Set) + second := make(Set) + + first.Add(x) + second.Add(x) + first.Add(x + 1) + first.Remove(x + 1) + + intersection := first.Intersection(second) + members := intersection.Elements() + + return members != nil && len(members) == 1 && members[0] == x + } + + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +}