Test data for the curator.

This commit is contained in:
Matt T. Proud 2013-03-26 12:33:48 +01:00
parent 7034c3981f
commit c53a72a894
10 changed files with 469 additions and 161 deletions

View file

@ -22,6 +22,8 @@ build:
$(MAKE) -C model $(MAKE) -C model
$(MAKE) -C web $(MAKE) -C web
go build ./... go build ./...
binary: build
go build -o prometheus.build go build -o prometheus.build
clean: clean:
@ -44,4 +46,4 @@ search_index:
documentation: search_index documentation: search_index
godoc -http=:6060 -index -index_files='search_index' godoc -http=:6060 -index -index_files='search_index'
.PHONY: advice build clean documentation format search_index test .PHONY: advice binary build clean documentation format search_index test

View file

@ -120,6 +120,10 @@ type fingerprint struct {
lastCharacterOfLastLabelValue string lastCharacterOfLastLabelValue string
} }
func (f fingerprint) String() string {
return f.ToRowKey()
}
func (f fingerprint) ToRowKey() string { func (f fingerprint) ToRowKey() string {
return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter) return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, rowKeyDelimiter)
} }

View file

@ -14,6 +14,7 @@
package model package model
import ( import (
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"time" "time"
) )
@ -24,6 +25,14 @@ type Watermark struct {
time.Time time.Time
} }
// ToMetricHighWatermarkDTO builds a MetricHighWatermark DTO out of a given
// Watermark.
func (w Watermark) ToMetricHighWatermarkDTO() *dto.MetricHighWatermark {
return &dto.MetricHighWatermark{
Timestamp: proto.Int64(w.Time.Unix()),
}
}
// NewWatermarkFromHighWatermarkDTO builds Watermark from the provided // NewWatermarkFromHighWatermarkDTO builds Watermark from the provided
// dto.MetricHighWatermark object. // dto.MetricHighWatermark object.
func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark {
@ -31,3 +40,10 @@ func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark {
time.Unix(*d.Timestamp, 0), time.Unix(*d.Timestamp, 0),
} }
} }
// NewWatermarkFromTime builds a new Watermark for the provided time.
func NewWatermarkFromTime(t time.Time) Watermark {
return Watermark{
t,
}
}

View file

@ -177,10 +177,10 @@ type (
type ( type (
// Matrix literal, i.e. metric name plus labelset and timerange. // Matrix literal, i.e. metric name plus labelset and timerange.
MatrixLiteral struct { MatrixLiteral struct {
labels model.LabelSet labels model.LabelSet
// Fingerprints are populated from labels at query analysis time. // Fingerprints are populated from labels at query analysis time.
fingerprints model.Fingerprints fingerprints model.Fingerprints
interval time.Duration interval time.Duration
} }
) )

View file

@ -64,7 +64,8 @@ func (c curator) run() (err error) {
var ( var (
decoder watermarkDecoder decoder watermarkDecoder
filter = watermarkFilter{ filter = watermarkFilter{
stop: c.stop, stop: c.stop,
curationState: c.curationState,
} }
operator = watermarkOperator{ operator = watermarkOperator{
olderThan: c.cutOff, olderThan: c.cutOff,

View file

@ -14,184 +14,464 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw" fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"sort"
"testing" "testing"
"time" "time"
) )
type ( type (
keyPair struct { curationState struct {
fingerprint model.Fingerprint fingerprint string
time time.Time groupSize int
olderThan time.Duration
lastCurated time.Time
} }
fakeCurationStates map[model.Fingerprint]time.Time watermarkState struct {
fakeSamples map[keyPair][]float32 fingerprint string
fakeWatermarks map[model.Fingerprint]time.Time lastAppended time.Time
in struct {
curationStates fakeCurationStates
samples fakeSamples
watermarks fakeWatermarks
cutOff time.Time
grouping uint32
} }
out struct { sample struct {
curationStates fakeCurationStates time time.Time
samples fakeSamples value float32
watermarks fakeWatermarks }
sampleGroup struct {
fingerprint string
values []sample
}
context struct {
curationStates fixture.Pairs
watermarkStates fixture.Pairs
sampleGroups fixture.Pairs
} }
) )
func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { func (c curationState) Get() (key, value coding.Encoder) {
panic("unimplemented") key = coding.NewProtocolBufferEncoder(&dto.CurationKey{
} Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
OlderThan: proto.Int64(int64(c.olderThan)),
})
func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { value = coding.NewProtocolBufferEncoder(&dto.CurationValue{
panic("unimplemented") LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
} })
func (c fakeCurationStates) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Close() error {
panic("unimplemented")
}
func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
var (
fingerprints model.Fingerprints
)
for f := range c {
fingerprints = append(fingerprints, f)
}
sort.Sort(fingerprints)
for _, k := range fingerprints {
v := c[k]
var (
decodedKey interface{}
decodedValue interface{}
)
decodedKey, err = d.DecodeKey(k)
if err != nil {
continue
}
decodedValue, err = d.DecodeValue(v)
if err != nil {
continue
}
switch f.Filter(decodedKey, decodedValue) {
case storage.STOP:
return
case storage.SKIP:
continue
case storage.ACCEPT:
opErr := o.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {
continue
}
break
}
}
}
return return
} }
func (c fakeCurationStates) Commit(_ raw.Batch) error { func (w watermarkState) Get() (key, value coding.Encoder) {
panic("unimplemented") key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
return
} }
func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { func (s sampleGroup) Get() (key, value coding.Encoder) {
panic("unimplemented") key = coding.NewProtocolBufferEncoder(&dto.SampleKey{
} Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
Timestamp: indexable.EncodeTime(s.values[0].time),
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
SampleCount: proto.Uint32(uint32(len(s.values))),
})
func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { series := &dto.SampleValueSeries{}
panic("unimplemented")
}
func (c fakeSamples) Drop(_ coding.Encoder) error { for _, value := range s.values {
panic("unimplemented") series.Value = append(series.Value, &dto.SampleValueSeries_Value{
} Timestamp: proto.Int64(value.time.Unix()),
Value: proto.Float32(float32(value.value)),
})
}
func (c fakeSamples) Put(_, _ coding.Encoder) error { value = coding.NewProtocolBufferEncoder(series)
panic("unimplemented")
}
func (c fakeSamples) Close() error { return
panic("unimplemented")
}
func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeSamples) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Close() error {
panic("unimplemented")
}
func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
} }
func TestCurator(t *testing.T) { func TestCurator(t *testing.T) {
var ( var (
scenarios = []struct { scenarios = []struct {
in in context context
out out
}{ }{
{ {
in: in{ context: context{
curationStates: fakeCurationStates{ curationStates: fixture.Pairs{
model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), curationState{
model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), fingerprint: "0001-A-1-Z",
groupSize: 5,
olderThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
},
curationState{
fingerprint: "0002-A-2-Z",
groupSize: 5,
olderThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 55 * time.Minute),
value: 2,
},
{
time: testInstant.Add(-1 * 50 * time.Minute),
value: 3,
},
{
time: testInstant.Add(-1 * 45 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 40 * time.Minute),
value: 0,
},
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 25 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 35 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: []sample{
{
time: testInstant.Add(-1 * 30 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 90 * time.Minute),
value: 0,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 89 * time.Minute),
value: 1,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 88 * time.Minute),
value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 87 * time.Minute),
value: 3,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 86 * time.Minute),
value: 4,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 85 * time.Minute),
value: 5,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 84 * time.Minute),
value: 6,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 83 * time.Minute),
value: 7,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 82 * time.Minute),
value: 8,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 81 * time.Minute),
value: 9,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 80 * time.Minute),
value: 10,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 79 * time.Minute),
value: 11,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 78 * time.Minute),
value: 12,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 77 * time.Minute),
value: 13,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 76 * time.Minute),
value: 14,
},
{
time: testInstant.Add(-1 * 75 * time.Minute),
value: 15,
},
{
time: testInstant.Add(-1 * 74 * time.Minute),
value: 16,
},
{
time: testInstant.Add(-1 * 73 * time.Minute),
value: 17,
},
{
time: testInstant.Add(-1 * 72 * time.Minute),
value: 18,
},
{
time: testInstant.Add(-1 * 71 * time.Minute),
value: 19,
},
{
time: testInstant.Add(-1 * 70 * time.Minute),
value: 20,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 69 * time.Minute),
value: 21,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 68 * time.Minute),
value: 22,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 67 * time.Minute),
value: 23,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 66 * time.Minute),
value: 24,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 65 * time.Minute),
value: 25,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 64 * time.Minute),
value: 26,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 63 * time.Minute),
value: 27,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 62 * time.Minute),
value: 28,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 61 * time.Minute),
value: 29,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: []sample{
{
time: testInstant.Add(-1 * 60 * time.Minute),
value: 30,
},
},
},
}, },
watermarks: fakeWatermarks{},
samples: fakeSamples{},
cutOff: testInstant.Add(5 * time.Minute),
grouping: 5,
}, },
}, },
} }
@ -199,15 +479,13 @@ func TestCurator(t *testing.T) {
for _, scenario := range scenarios { for _, scenario := range scenarios {
var ( var (
in = scenario.in curatorDirectory = fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates))
watermarkDirectory = fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates))
curationStates = in.curationStates sampleDirectory = fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups))
samples = in.samples
watermarks = in.watermarks
cutOff = in.cutOff
grouping = in.grouping
) )
defer curatorDirectory.Close()
defer watermarkDirectory.Close()
defer sampleDirectory.Close()
_ = newCurator(cutOff, grouping, curationStates, samples, watermarks)
} }
} }

View file

@ -63,6 +63,8 @@ var (
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
} }
curationDuration = metrics.NewCounter()
curationDurations = metrics.NewHistogram(diskLatencyHistogram)
storageOperations = metrics.NewCounter() storageOperations = metrics.NewCounter()
storageOperationDurations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram) storageLatency = metrics.NewHistogram(diskLatencyHistogram)

View file

@ -21,7 +21,9 @@ import (
) )
var ( var (
testInstant = time.Time{} // ``hg clone https://code.google.com/p/go ; cd go ; hg log | tail -n 20``
usEastern, _ = time.LoadLocation("US/Eastern")
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern)
) )
func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) {

View file

@ -14,7 +14,6 @@
package metric package metric
import ( import (
"fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"sort" "sort"
@ -380,9 +379,7 @@ func testMakeView(t test.Tester) {
} }
} }
start := time.Now()
tiered.Flush() tiered.Flush()
fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data))
requestBuilder := NewViewRequestBuilder() requestBuilder := NewViewRequestBuilder()

View file

@ -75,6 +75,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
defer t.Close() defer t.Close()
p.tester.Fatal(err) p.tester.Fatal(err)
} }
defer func() {
err = persistence.Close()
if err != nil {
p.tester.Fatal(err)
}
}()
for f.HasNext() { for f.HasNext() {
var ( var (