Initial directory re-arrangement for storage.

This commit is contained in:
Matt T. Proud 2012-11-26 19:56:51 +01:00
parent 8af1458b39
commit 2bbdaa5790
15 changed files with 117 additions and 875 deletions

View file

@ -1,61 +0,0 @@
// {
// set evaluation_interval = "30s";
// target "http://www.example.com:80/metrics"
// rule archived {name="instance:requests_total:sum"} = sum by (instance) {name="requests"};
// rule archived {name="instance:requests-by_result_code_total:sum"} =
// sum by (instance,result_code) {name="requests"};
// rule archived {name="instance:requests-by_result_code:sum"} =
// {name="instances:requests-by_result_code"}
// / by (instance)
// {name="instances:requests_total:sum"};
// }
{
set evaluation_interval = “2m”;
permanent {
rule {
labels {
set name = “process:request_rate_qps-allowed:sum”;
set job = “frontend”;
}
// Values may be a literal or an expression.
set value = 500;
// I wonder: Is it practical to express labels similar to above in a better DSL?
// set value = EXPRESSION … WITH LABELS {foo=”bar”};
}
}
rule {
// This provides a way of overriding existing labels, unsetting them, or
// appending new ones.
labels {
// “name” is obligatory.
set name = “process:requests_total:sum”;
}
// Here we are extracting a metric with the name label value of “requests” from
// job “frontend” and merely accumulating this into a named variable. It is
// similar to standing. Each sum is keyed to the UNIX process and job from which
// it came.
set value = SUM({name=”requests”, job=”frontend”}) BY (process, job);
}
rule {
// This provides a way of overriding existing labels, unsetting them, or
// appending new ones.
labels {
// “name” is obligatory.
set name = “process:request_qps:rate5m”;
}
// Here we are extracting a metric with the name label value of “requests” from
// job “frontend” and merely accumulating this into a named variable. It is
// similar to standing.
set value = RATE({name=”process:requests_total:sum”, job=”frontend”} OVER “5m”);
}
}
}

View file

@ -1,506 +0,0 @@
package main
import (
"fmt"
"strings"
"unicode"
"unicode/utf8"
)
type itemType int
const (
itemError itemType = iota
itemEof
itemRuleContextOpen
itemRuleContextClose
itemSetKeyword
itemKey
itemEqual
itemValue
itemSemicolon
itemQuote
itemRulesKeyword
itemRuleKeyword
itemRuleName
itemRuleValue
itemOpenBracket
itemCloseBracket
)
const (
literalCloseBracket = "}"
literalEof = -1
literalOpenBracket = "{"
literalRule = "rule"
literalRules = "rules"
literalSet = "set"
literalEqual = "="
literalQuote = `"`
literalSemicolon = ";"
)
type element struct {
itemType itemType
value string
}
func (e element) String() string {
switch e.itemType {
case itemError:
return e.value
case itemEof:
return "EOF"
}
return fmt.Sprintf("%s %q", e.itemType, e.value)
}
type lexer struct {
input string
elements chan element
start int
position int
runeWidth int
}
func lex(name, input string) (*lexer, chan element) {
l := &lexer{
input: input,
elements: make(chan element),
}
go l.run()
return l, l.elements
}
func (l *lexer) run() {
for state := lexBody; state != nil; {
state = state(l)
}
close(l.elements)
}
func (l *lexer) next() (rune rune) {
if l.position >= len(l.input) {
l.runeWidth = 0
return literalEof
}
rune, l.runeWidth = utf8.DecodeRuneInString(l.input[l.position:])
l.position += l.runeWidth
return rune
}
func lexBody(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalOpenBracket) {
return lexRulesetOpen
}
switch rune := l.next(); {
case rune == literalEof:
l.emit(itemEof)
return nil
case isSpace(rune):
l.ignore()
default:
return l.errorf("illegal input")
}
return lexBody
}
func lexRulesetOpen(l *lexer) lexFunction {
l.position += len(literalOpenBracket)
l.emit(itemRuleContextOpen)
return lexRulesetInside
}
func lexRulesetInside(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalCloseBracket) {
return lexRulesetClose
}
if strings.HasPrefix(l.input[l.position:], literalSet) {
return lexRuleSetKeyword
}
if strings.HasPrefix(l.input[l.position:], literalRules) {
return lexRulesetRules
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("unterminated ruleset")
case isSpace(rune):
l.ignore()
case rune == ';':
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetInside
}
func lexRulesetRules(l *lexer) lexFunction {
l.position += len(literalRules)
l.emit(itemRulesKeyword)
return lexRulesetRulesBlockOpen
}
func lexRulesetRulesBlockOpen(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalOpenBracket) {
l.position += len(literalOpenBracket)
l.emit(itemOpenBracket)
return lexRulesetRulesBlockInside
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRulesBlockOpen
}
func lexRulesetRulesBlockInside(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalRule) {
return lexRulesetRuleBegin
}
if strings.HasPrefix(l.input[l.position:], literalCloseBracket) {
return lexRulesetRulesBlockClose
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRulesBlockInside
}
func lexRulesetRulesBlockClose(l *lexer) lexFunction {
l.position += len(literalCloseBracket)
l.emit(itemCloseBracket)
return lexRulesetInside
}
func lexRulesetRuleBegin(l *lexer) lexFunction {
l.position += len(literalRule)
l.emit(itemRuleKeyword)
return lexRulesetRuleName
}
func lexRulesetRuleName(l *lexer) lexFunction {
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
case isIdentifierOpen(rune):
for {
switch rune := l.next(); {
case isMetricIdentifier(rune):
case rune == '=':
l.backup()
l.emit(itemRuleName)
return lexRulesetRuleEqual
default:
return l.errorf("bad rule name")
}
}
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleName
}
func lexRulesetRuleEqual(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalEqual) {
l.position += len(literalEqual)
l.emit(itemEqual)
return lexRulesetRuleDefinitionBegin
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleEqual
}
func lexRulesetRuleDefinitionBegin(l *lexer) lexFunction {
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
case isIdentifierOpen(rune):
for {
switch rune := l.next(); {
case isMetricIdentifier(rune):
case rune == ';':
l.emit(itemRuleValue)
return lexRulesetRulesBlockInside
default:
return l.errorf("unrecognized input")
}
}
default:
return l.errorf("unrecognized input")
}
return lexRulesetRuleDefinitionBegin
}
func lexRuleSetKeyword(l *lexer) lexFunction {
l.position += len(literalSet)
l.emit(itemSetKeyword)
return lexRuleSetInside
}
func (l *lexer) backup() {
l.position -= l.runeWidth
}
func isIdentifierOpen(rune rune) bool {
switch rune := rune; {
case unicode.IsLetter(rune):
return true
case rune == '_':
return true
}
return false
}
func lexRuleSetInside(l *lexer) lexFunction {
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("unterminated set statement")
case isSpace(rune):
l.ignore()
case rune == ';':
return l.errorf("unexpected ;")
case rune == '=':
return l.errorf("unexpected =")
case isIdentifierOpen(rune):
l.backup()
return lexRuleSetKey
default:
return l.errorf("unrecognized input")
}
return lexRuleSetInside
}
func isIdentifier(rune rune) bool {
switch rune := rune; {
case isIdentifierOpen(rune):
return true
case unicode.IsDigit(rune):
return true
}
return false
}
func isMetricIdentifier(rune rune) bool {
switch rune := rune; {
case isIdentifier(rune):
return true
case rune == ':':
return true
}
return false
}
func (l *lexer) peek() rune {
rune := l.next()
l.backup()
return rune
}
func (l *lexer) atTerminator() bool {
switch rune := l.peek(); {
case isSpace(rune):
return true
case rune == ';':
return true
}
return false
}
func lexRuleSetKey(l *lexer) lexFunction {
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isIdentifier(rune):
default:
l.backup()
if !l.atTerminator() {
return l.errorf("unexpected character %+U %q", rune, rune)
}
l.emit(itemKey)
return lexRuleSetEqual
}
return lexRuleSetKey
}
func lexRuleSetEqual(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalEqual) {
l.position += len(literalEqual)
l.emit(itemEqual)
return lexRuleSetValueOpenQuote
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetEqual
}
func lexRuleSetValueOpenQuote(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalQuote) {
l.position += len(literalQuote)
l.emit(itemQuote)
return lexRuleSetValue
}
switch rune := l.next(); {
case rune == literalEof:
return l.errorf("incomplete set statement")
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetValueOpenQuote
}
func lexRuleSetValue(l *lexer) lexFunction {
var lastRuneEscapes bool = false
for {
rune := l.next()
{
if rune == '"' && !lastRuneEscapes {
l.backup()
l.emit(itemValue)
return lexRuleSetValueCloseQuote
}
if !lastRuneEscapes && rune == '\\' {
lastRuneEscapes = true
} else {
lastRuneEscapes = false
}
}
}
panic("unreachable")
}
func lexRuleSetValueCloseQuote(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalQuote) {
l.position += len(literalQuote)
l.emit(itemQuote)
return lexRuleSetSemicolon
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetValueCloseQuote
}
func lexRuleSetSemicolon(l *lexer) lexFunction {
if strings.HasPrefix(l.input[l.position:], literalSemicolon) {
l.position += len(literalSemicolon)
l.emit(itemSemicolon)
return lexRulesetInside
}
switch rune := l.next(); {
case isSpace(rune):
l.ignore()
default:
return l.errorf("unexpected character %+U %q", rune, rune)
}
return lexRuleSetSemicolon
}
func (l *lexer) ignore() {
l.start = l.position
}
func (l *lexer) errorf(format string, args ...interface{}) lexFunction {
l.elements <- element{itemError, fmt.Sprintf(format, args...)}
return nil
}
func isSpace(rune rune) bool {
switch rune {
case ' ', '\t', '\n', '\r':
return true
}
return false
}
func lexRulesetClose(l *lexer) lexFunction {
l.position += len(literalCloseBracket)
l.emit(itemCloseBracket)
return lexBody
}
func (l *lexer) emit(i itemType) {
l.elements <- element{i, l.input[l.start:l.position]}
l.start = l.position
}
type lexFunction func(*lexer) lexFunction
func main() {
in := `{
set evaluation_interval = "10m";
rules {
}
set name = "your mom";
}
{
set evaluation_interval = "30m";
}`
fmt.Println(in)
_, v := lex("", in)
for value := range v {
fmt.Println(value)
}
}

View file

@ -1,192 +0,0 @@
package main
import (
"testing"
)
type lexTest struct {
name string
input string
elements []element
}
var (
tEof = element{itemEof, ""}
tOpenRuleset = element{itemOpenBracket, "{"}
tCloseBracket = element{itemCloseBracket, "}"}
tIllegal = element{itemError, "illegal input"}
tSet = element{itemSetKeyword, "set"}
tEqual = element{itemEqual, "="}
tQuote = element{itemQuote, `"`}
tSemicolon = element{itemSemicolon, ";"}
tRules = element{itemRulesKeyword, "rules"}
)
var lexTests = []lexTest{
{
"empty",
"",
[]element{
tEof,
},
},
{
"empty with new line",
"\n\n",
[]element{
tEof,
},
},
{
"one empty ruleset",
"{}",
[]element{
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"one empty ruleset distributed over new line",
"{\n}",
[]element{
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"two empty rulesets",
"{} {}",
[]element{
tOpenRuleset,
tCloseBracket,
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"two empty rulesets distributed over new line",
"{}\n{}",
[]element{
tOpenRuleset,
tCloseBracket,
tOpenRuleset,
tCloseBracket,
tEof,
},
},
{
"garbage",
"garbage",
[]element{tIllegal},
},
{
"one set",
`{ set foo = "bar"; }`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
{
"one set over multiple lines",
`{
set
foo
=
"bar"
;
}`,
[]element{tOpenRuleset, tSet, element{itemKey, "foo"}, tEqual, tQuote, element{itemValue, "bar"}, tQuote, tSemicolon, tCloseBracket, tEof},
},
{
"two sets",
`{ set foo = "bar";set baz = "qux"; }`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tSet,
element{itemKey, "baz"},
tEqual,
tQuote,
element{itemValue, "qux"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
{
"two over multiple lines",
`{ set foo = "bar";
set
baz
=
"qux"
;
}`,
[]element{
tOpenRuleset,
tSet,
element{itemKey, "foo"},
tEqual,
tQuote,
element{itemValue, "bar"},
tQuote,
tSemicolon,
tSet,
element{itemKey, "baz"},
tEqual,
tQuote,
element{itemValue, "qux"},
tQuote,
tSemicolon,
tCloseBracket,
tEof,
},
},
}
func collect(l *lexTest) []element {
_, v := lex("", l.input)
emission := make([]element, 0)
for i := range v {
emission = append(emission, i)
}
return emission
}
func TestFoo(t *testing.T) {
for _, test := range lexTests {
e := collect(&test)
if len(e) != len(test.elements) {
t.Errorf("%s: got\n\n\t%v\nexpected\n\n\t%v", test.name, e, test.elements)
}
for i, _ := range test.elements {
if test.elements[i] != e[i] {
t.Errorf("%s[%d]: got\n\n\t%v\nexpected\n\n\t%v", test.name, i, e[i], test.elements[i])
}
}
}
}

View file

@ -2,11 +2,12 @@ package main
import (
"code.google.com/p/gorest"
"github.com/matttproud/prometheus/storage/metric/leveldb"
"net/http"
)
func main() {
m, _ := NewLevigoMetricPersistence("/tmp/metrics")
m, _ := leveldb.NewLevigoMetricPersistence("/tmp/metrics")
s := &MetricsService{
persistence: m,
}

View file

@ -1,15 +0,0 @@
package main
type MetricPersistence interface {
Close() error
AppendSample(sample *Sample) error
GetLabelNames() ([]string, error)
GetLabelPairs() ([]LabelPairs, error)
GetMetrics() ([]LabelPairs, error)
GetMetricFingerprintsForLabelPairs(labelSets []*LabelPairs) ([]*Fingerprint, error)
RecordLabelNameFingerprint(sample *Sample) error
RecordFingerprintWatermark(sample *Sample) error
GetFingerprintLabelPairs(fingerprint Fingerprint) (LabelPairs, error)
}

View file

@ -1,4 +1,4 @@
package main
package model
import (
"crypto/md5"

View file

@ -2,18 +2,20 @@ package main
import (
"code.google.com/p/gorest"
"github.com/matttproud/prometheus/model"
"github.com/matttproud/prometheus/storage/metric/leveldb"
)
type MetricsService struct {
gorest.RestService `root:"/" consumes:"application/json" produces:"application/json"`
persistence *LevigoMetricPersistence
persistence *leveldb.LevigoMetricPersistence
listLabels gorest.EndPoint `method:"GET" path:"/labels/" output:"[]string"`
listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]LabelPairs"`
listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]LabelPairs"`
listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]model.LabelPairs"`
listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]model.LabelPairs"`
appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"Sample"`
appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"model.Sample"`
}
func (m MetricsService) ListLabels() []string {
@ -26,7 +28,7 @@ func (m MetricsService) ListLabels() []string {
return labels
}
func (m MetricsService) ListLabelPairs() []LabelPairs {
func (m MetricsService) ListLabelPairs() []model.LabelPairs {
labelPairs, labelPairsError := m.persistence.GetLabelPairs()
if labelPairsError != nil {
@ -36,7 +38,7 @@ func (m MetricsService) ListLabelPairs() []LabelPairs {
return labelPairs
}
func (m MetricsService) ListMetrics() []LabelPairs {
func (m MetricsService) ListMetrics() []model.LabelPairs {
metrics, metricsError := m.persistence.GetMetrics()
if metricsError != nil {
@ -46,7 +48,7 @@ func (m MetricsService) ListMetrics() []LabelPairs {
return metrics
}
func (m MetricsService) AppendSample(s Sample) {
func (m MetricsService) AppendSample(s model.Sample) {
responseBuilder := m.ResponseBuilder()
if appendError := m.persistence.AppendSample(&s); appendError == nil {
responseBuilder.SetResponseCode(200)

View file

@ -0,0 +1,19 @@
package metric
import (
"github.com/matttproud/prometheus/model"
)
type MetricPersistence interface {
Close() error
AppendSample(sample *model.Sample) error
GetLabelNames() ([]string, error)
GetLabelPairs() ([]model.LabelPairs, error)
GetMetrics() ([]model.LabelPairs, error)
GetMetricFingerprintsForLabelPairs(labelSets []*model.LabelPairs) ([]*model.Fingerprint, error)
RecordLabelNameFingerprint(sample *model.Sample) error
RecordFingerprintWatermark(sample *model.Sample) error
GetFingerprintLabelPairs(fingerprint model.Fingerprint) (model.LabelPairs, error)
}

View file

@ -1,4 +1,4 @@
package main
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
@ -6,30 +6,34 @@ import (
"fmt"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
data "github.com/matttproud/prometheus/model/generated"
index "github.com/matttproud/prometheus/storage/raw/index/leveldb"
storage "github.com/matttproud/prometheus/storage/raw/leveldb"
"github.com/matttproud/prometheus/utility"
"io"
"log"
"sort"
)
type pendingArchival map[int64]float64
type LevigoMetricPersistence struct {
fingerprintHighWaterMarks *LevigoPersistence
fingerprintLabelPairs *LevigoPersistence
fingerprintLowWaterMarks *LevigoPersistence
fingerprintSamples *LevigoPersistence
labelNameFingerprints *LevigoPersistence
labelPairFingerprints *LevigoPersistence
metricMembershipIndex *LevigoMembershipIndex
fingerprintHighWaterMarks *storage.LevigoPersistence
fingerprintLabelPairs *storage.LevigoPersistence
fingerprintLowWaterMarks *storage.LevigoPersistence
fingerprintSamples *storage.LevigoPersistence
labelNameFingerprints *storage.LevigoPersistence
labelPairFingerprints *storage.LevigoPersistence
metricMembershipIndex *index.LevigoMembershipIndex
}
type levigoOpener func()
func (l *LevigoMetricPersistence) Close() error {
log.Printf("Closing LevigoPersistence storage containers...")
var persistences = []struct {
name string
closer LevigoCloser
closer io.Closer
}{
{
"Fingerprint High-Water Marks",
@ -94,8 +98,6 @@ func (l *LevigoMetricPersistence) Close() error {
return nil
}
type levigoOpener func()
func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) {
log.Printf("Opening LevigoPersistence storage containers...")
@ -111,7 +113,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"High-Water Marks by Fingerprint",
func() {
var anomaly error
emission.fingerprintHighWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10)
emission.fingerprintHighWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
@ -119,7 +121,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Label Names and Value Pairs by Fingerprint",
func() {
var anomaly error
emission.fingerprintLabelPairs, anomaly = NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
emission.fingerprintLabelPairs, anomaly = storage.NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
@ -127,7 +129,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Low-Water Marks by Fingerprint",
func() {
var anomaly error
emission.fingerprintLowWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10)
emission.fingerprintLowWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
@ -135,7 +137,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Samples by Fingerprint",
func() {
var anomaly error
emission.fingerprintSamples, anomaly = NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
emission.fingerprintSamples, anomaly = storage.NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10)
errorChannel <- anomaly
},
},
@ -143,7 +145,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Fingerprints by Label Name",
func() {
var anomaly error
emission.labelNameFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
emission.labelNameFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10)
errorChannel <- anomaly
},
},
@ -151,7 +153,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Fingerprints by Label Name and Value Pair",
func() {
var anomaly error
emission.labelPairFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
emission.labelPairFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10)
errorChannel <- anomaly
},
},
@ -159,7 +161,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
"Metric Membership Index",
func() {
var anomaly error
emission.metricMembershipIndex, anomaly = NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
emission.metricMembershipIndex, anomaly = index.NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10)
errorChannel <- anomaly
},
},
@ -190,7 +192,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence,
return emission, nil
}
func ddoFromSample(sample *Sample) *data.MetricDDO {
func ddoFromSample(sample *model.Sample) *data.MetricDDO {
labelNames := make([]string, 0, len(sample.Labels))
for labelName, _ := range sample.Labels {
@ -218,7 +220,7 @@ func ddoFromSample(sample *Sample) *data.MetricDDO {
return metricDDO
}
func ddoFromMetric(metric Metric) *data.MetricDDO {
func ddoFromMetric(metric model.Metric) *data.MetricDDO {
labelNames := make([]string, 0, len(metric))
for labelName, _ := range metric {
@ -266,7 +268,7 @@ func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error {
func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := FingerprintFromByteArray(messageByteArray)
fingerprint := model.FingerprintFromByteArray(messageByteArray)
return &data.FingerprintDDO{
Signature: proto.String(string(fingerprint)),
}, nil
@ -431,7 +433,7 @@ func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error
return errors.New("Unknown error in appending label pairs to fingerprint.")
}
func (l *LevigoMetricPersistence) AppendSample(sample *Sample) error {
func (l *LevigoMetricPersistence) AppendSample(sample *model.Sample) error {
fmt.Printf("Sample: %q\n", sample)
metricDDO := ddoFromSample(sample)
@ -501,14 +503,14 @@ func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) {
return nil, errors.New("Unknown error encountered when querying label names.")
}
func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) {
func (l *LevigoMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) {
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
result := make([]LabelPairs, 0, len(getAll))
result := make([]model.LabelPairs, 0, len(getAll))
labelPairDDO := &data.LabelPairDDO{}
for _, pair := range getAll {
if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil {
item := LabelPairs{
item := model.LabelPairs{
*labelPairDDO.Name: *labelPairDDO.Value,
}
result = append(result, item)
@ -526,12 +528,12 @@ func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) {
return nil, errors.New("Unknown error encountered when querying label pairs.")
}
func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) {
func (l *LevigoMetricPersistence) GetMetrics() ([]model.LabelPairs, error) {
log.Printf("GetMetrics()\n")
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil {
log.Printf("getAll: %q\n", getAll)
result := make([]LabelPairs, 0)
result := make([]model.LabelPairs, 0)
fingerprintCollection := &data.FingerprintCollectionDDO{}
fingerprints := make(utility.Set)
@ -552,7 +554,7 @@ func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) {
labelPairCollectionDDO := &data.LabelPairCollectionDDO{}
if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil {
intermediate := make(LabelPairs, 0)
intermediate := make(model.LabelPairs, 0)
for _, member := range labelPairCollectionDDO.Member {
intermediate[*member.Name] = *member.Value
@ -581,7 +583,7 @@ func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) {
return nil, errors.New("Unknown error encountered when querying metrics.")
}
func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interval, int, error) {
func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) {
metricDDO := ddoFromMetric(metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
@ -602,7 +604,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv
var foundEntries int = 0
if *fingerprintDDO.Signature == *found.Fingerprint.Signature {
emission := &Interval{
emission := &model.Interval{
OldestInclusive: indexable.DecodeTime(found.Timestamp),
NewestInclusive: indexable.DecodeTime(found.Timestamp),
}
@ -622,7 +624,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv
}
return emission, foundEntries, nil
} else {
return &Interval{}, -6, nil
return &model.Interval{}, -6, nil
}
} else {
log.Printf("Could not de-serialize start key: %q\n", unmarshalErr)
@ -655,7 +657,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv
// TODO(mtp): Holes in the data!
func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval Interval) ([]Samples, error) {
func (l *LevigoMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDDO := ddoFromMetric(metric)
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil {
@ -667,7 +669,7 @@ func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval In
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]Samples, 0)
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
@ -680,8 +682,8 @@ func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval In
if *fingerprintDDO.Signature == *key.Fingerprint.Signature {
// Wart
if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() {
emission = append(emission, Samples{
Value: SampleValue(*value.Value),
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {

View file

@ -1,8 +1,9 @@
package main
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/matttproud/prometheus/model"
data "github.com/matttproud/prometheus/model/generated"
"io/ioutil"
"math"
@ -168,10 +169,10 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) {
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
sample := &model.Sample{
Value: model.SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), int64(x)),
Labels: LabelPairs{string(x): string(x)},
Labels: model.LabelPairs{string(x): string(x)},
}
appendErr := persistence.AppendSample(sample)
@ -200,10 +201,10 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) {
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
sample := &model.Sample{
Value: model.SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), int64(x)),
Labels: LabelPairs{string(x): string(x)},
Labels: model.LabelPairs{string(x): string(x)},
}
appendErr := persistence.AppendSample(sample)
@ -293,10 +294,10 @@ func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) {
}()
appendSample := func(x int) bool {
sample := &Sample{
Value: SampleValue(float32(x)),
sample := &model.Sample{
Value: model.SampleValue(float32(x)),
Timestamp: time.Unix(int64(x), 0),
Labels: LabelPairs{"name": "my_metric"},
Labels: model.LabelPairs{"name": "my_metric"},
}
appendErr := persistence.AppendSample(sample)
@ -338,8 +339,8 @@ func TestStochastic(t *testing.T) {
metricNewestSample := make(map[int]int64)
for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ {
sample := &Sample{
Labels: LabelPairs{},
sample := &model.Sample{
Labels: model.LabelPairs{},
}
sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex)
@ -381,7 +382,7 @@ func TestStochastic(t *testing.T) {
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = time.Unix(nextTimestamp(), 0)
sample.Value = SampleValue(sampleIndex)
sample.Value = model.SampleValue(sampleIndex)
appendErr := persistence.AppendSample(sample)
@ -504,7 +505,7 @@ func TestStochastic(t *testing.T) {
}
}
metric := make(Metric)
metric := make(model.Metric)
metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex)
@ -565,7 +566,7 @@ func TestStochastic(t *testing.T) {
end = second
}
interval := Interval{
interval := model.Interval{
OldestInclusive: time.Unix(begin, 0),
NewestInclusive: time.Unix(end, 0),
}

View file

@ -1,4 +1,4 @@
package main
package index
import (
"github.com/matttproud/prometheus/coding"

View file

@ -1,8 +1,9 @@
package main
package leveldb
import (
"github.com/matttproud/prometheus/coding"
data "github.com/matttproud/prometheus/model/generated"
"github.com/matttproud/prometheus/storage/raw/leveldb"
)
var (
@ -10,7 +11,7 @@ var (
)
type LevigoMembershipIndex struct {
persistence *LevigoPersistence
persistence *leveldb.LevigoPersistence
}
func (l *LevigoMembershipIndex) Close() error {
@ -30,10 +31,10 @@ func (l *LevigoMembershipIndex) Put(key coding.Encoder) error {
}
func NewLevigoMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoMembershipIndex, error) {
var levigoPersistence *LevigoPersistence
var levigoPersistence *leveldb.LevigoPersistence
var levigoPersistenceError error
if levigoPersistence, levigoPersistenceError = NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil {
if levigoPersistence, levigoPersistenceError = leveldb.NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil {
levigoMembershipIndex := &LevigoMembershipIndex{
persistence: levigoPersistence,
}

View file

@ -1,4 +1,4 @@
package main
package raw
import (
"github.com/matttproud/prometheus/coding"

View file

@ -1,15 +1,12 @@
package main
package leveldb
import (
"github.com/jmhodges/levigo"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/storage/raw"
"io"
)
type LevigoCloser interface {
Close() error
}
type LevigoPersistence struct {
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy
@ -19,6 +16,13 @@ type LevigoPersistence struct {
writeOptions *levigo.WriteOptions
}
type iteratorCloser struct {
iterator *levigo.Iterator
readOptions *levigo.ReadOptions
snapshot *levigo.Snapshot
storage *levigo.DB
}
func NewLevigoPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoPersistence, error) {
options := levigo.NewOptions()
options.SetCreateIfMissing(true)
@ -87,14 +91,13 @@ func (l *LevigoPersistence) Close() error {
}
func (l *LevigoPersistence) Get(value coding.Encoder) ([]byte, error) {
var key []byte
var keyError error
if key, keyError = value.Encode(); keyError == nil {
if key, keyError := value.Encode(); keyError == nil {
return l.storage.Get(l.readOptions, key)
} else {
return nil, keyError
}
return nil, keyError
panic("unreachable")
}
func (l *LevigoPersistence) Has(value coding.Encoder) (bool, error) {
@ -108,26 +111,20 @@ func (l *LevigoPersistence) Has(value coding.Encoder) (bool, error) {
}
func (l *LevigoPersistence) Drop(value coding.Encoder) error {
var key []byte
var keyError error
if key, keyError = value.Encode(); keyError == nil {
if key, keyError := value.Encode(); keyError == nil {
if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil {
return deleteError
}
return nil
} else {
return keyError
}
return keyError
return nil
}
func (l *LevigoPersistence) Put(key, value coding.Encoder) error {
var keyEncoded []byte
var keyError error
if keyEncoded, keyError = key.Encode(); keyError == nil {
if keyEncoded, keyError := key.Encode(); keyError == nil {
if valueEncoded, valueError := value.Encode(); valueError == nil {
if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil {
@ -136,14 +133,14 @@ func (l *LevigoPersistence) Put(key, value coding.Encoder) error {
} else {
return valueError
}
return nil
} else {
return keyError
}
return keyError
return nil
}
func (l *LevigoPersistence) GetAll() ([]Pair, error) {
func (l *LevigoPersistence) GetAll() ([]raw.Pair, error) {
snapshot := l.storage.NewSnapshot()
defer l.storage.ReleaseSnapshot(snapshot)
readOptions := levigo.NewReadOptions()
@ -154,10 +151,10 @@ func (l *LevigoPersistence) GetAll() ([]Pair, error) {
defer iterator.Close()
iterator.SeekToFirst()
result := make([]Pair, 0)
result := make([]raw.Pair, 0)
for iterator := iterator; iterator.Valid(); iterator.Next() {
result = append(result, Pair{Left: iterator.Key(), Right: iterator.Value()})
result = append(result, raw.Pair{Left: iterator.Key(), Right: iterator.Value()})
}
iteratorError := iterator.GetError()
@ -169,13 +166,6 @@ func (l *LevigoPersistence) GetAll() ([]Pair, error) {
return result, nil
}
type iteratorCloser struct {
iterator *levigo.Iterator
readOptions *levigo.ReadOptions
snapshot *levigo.Snapshot
storage *levigo.DB
}
func (i *iteratorCloser) Close() error {
defer func() {
if i.storage != nil {