Checkpoint.

This commit is contained in:
Matt T. Proud 2013-03-01 09:51:36 -08:00
parent 41068c2e84
commit f39b9c3c8e
7 changed files with 284 additions and 110 deletions

View file

@ -21,8 +21,15 @@ type ProtocolBufferEncoder struct {
message proto.Message message proto.Message
} }
func (p *ProtocolBufferEncoder) Encode() ([]byte, error) { func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) {
return proto.Marshal(p.message) raw, err = proto.Marshal(p.message)
// XXX: Adjust legacy users of this to not check for error.
if err != nil {
panic(err)
}
return
} }
func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder {

29
main.go
View file

@ -15,8 +15,10 @@ package main
import ( import (
"flag" "flag"
"fmt"
"github.com/prometheus/prometheus/appstate" "github.com/prometheus/prometheus/appstate"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
@ -31,6 +33,8 @@ import (
// Commandline flags. // Commandline flags.
var ( var (
_ = fmt.Sprintf("")
configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.")
metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.") metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.")
scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.")
@ -92,20 +96,39 @@ func main() {
ts := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20) ts := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20)
go ts.Serve() go ts.Serve()
go ts.Expose()
go func() {
ticker := time.Tick(time.Second)
for i := 0; i < 5; i++ {
<-ticker
if i%10 == 0 {
fmt.Printf(".")
}
}
fmt.Println()
//f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0")
f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0")
v := metric.NewViewRequestBuilder()
v.GetMetricAtTime(f, time.Now().Add(-30*time.Second))
view, err := ts.MakeView(v, time.Minute)
fmt.Println(view, err)
}()
for { for {
select { select {
case scrapeResult := <-scrapeResults: case scrapeResult := <-scrapeResults:
if scrapeResult.Err == nil { if scrapeResult.Err == nil {
persistence.AppendSample(scrapeResult.Sample) // f := model.NewFingerprintFromMetric(scrapeResult.Sample.Metric)
// fmt.Println(f)
// persistence.AppendSample(scrapeResult.Sample)
ts.AppendSample(scrapeResult.Sample) ts.AppendSample(scrapeResult.Sample)
} }
case ruleResult := <-ruleResults: case ruleResult := <-ruleResults:
for _, sample := range ruleResult.Samples { for _, sample := range ruleResult.Samples {
// XXX: Wart // XXX: Wart
persistence.AppendSample(*sample) // persistence.AppendSample(*sample)
ts.AppendSample(*sample) ts.AppendSample(*sample)
} }
} }

View file

@ -39,15 +39,16 @@ message LabelSet {
} }
message SampleKey { message SampleKey {
optional Fingerprint fingerprint = 1; optional Fingerprint fingerprint = 1;
optional bytes timestamp = 2; optional bytes timestamp = 2;
optional int64 last_timestamp = 3; optional int64 last_timestamp = 3;
optional uint32 sample_count = 4;
} }
message SampleValueSeries { message SampleValueSeries {
message Value { message Value {
optional int64 timestamp = 1; optional int64 timestamp = 1;
optional float value = 2; optional float value = 2;
} }
repeated Value value = 1; repeated Value value = 1;
} }

View file

@ -39,17 +39,13 @@ func (f *diskFrontier) String() string {
} }
func newDiskFrontier(i iterator) (d *diskFrontier, err error) { func newDiskFrontier(i iterator) (d *diskFrontier, err error) {
if err != nil {
return
}
i.SeekToLast() i.SeekToLast()
if i.Key() == nil { if i.Key() == nil {
return return
} }
lastKey, err := extractSampleKey(i) lastKey, err := extractSampleKey(i)
if err != nil { if err != nil {
return panic(err)
} }
i.SeekToFirst() i.SeekToFirst()
@ -58,7 +54,7 @@ func newDiskFrontier(i iterator) (d *diskFrontier, err error) {
return return
} }
if err != nil { if err != nil {
return panic(err)
} }
d = &diskFrontier{} d = &diskFrontier{}
@ -107,7 +103,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
raw, err := coding.NewProtocolBufferEncoder(key).Encode() raw, err := coding.NewProtocolBufferEncoder(key).Encode()
if err != nil { if err != nil {
return panic(err)
} }
i.Seek(raw) i.Seek(raw)
@ -117,7 +113,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
retrievedKey, err := extractSampleKey(i) retrievedKey, err := extractSampleKey(i)
if err != nil { if err != nil {
return panic(err)
} }
retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)
@ -133,7 +129,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
retrievedKey, err = extractSampleKey(i) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return panic(err)
} }
retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)
// If the previous key does not match, we know that the requested // If the previous key does not match, we know that the requested
@ -152,14 +148,14 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
raw, err = coding.NewProtocolBufferEncoder(key).Encode() raw, err = coding.NewProtocolBufferEncoder(key).Encode()
if err != nil { if err != nil {
return panic(err)
} }
i.Seek(raw) i.Seek(raw)
retrievedKey, err = extractSampleKey(i) retrievedKey, err = extractSampleKey(i)
if err != nil { if err != nil {
return panic(err)
} }
retrievedFingerprint = model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) retrievedFingerprint = model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature)

View file

@ -31,6 +31,7 @@ const (
appendLabelPairFingerprint = "append_label_pair_fingerprint" appendLabelPairFingerprint = "append_label_pair_fingerprint"
appendSample = "append_sample" appendSample = "append_sample"
appendSamples = "append_samples" appendSamples = "append_samples"
flushMemory = "flush_memory"
getBoundaryValues = "get_boundary_values" getBoundaryValues = "get_boundary_values"
getFingerprintsForLabelName = "get_fingerprints_for_label_name" getFingerprintsForLabelName = "get_fingerprints_for_label_name"
getFingerprintsForLabelSet = "get_fingerprints_for_labelset" getFingerprintsForLabelSet = "get_fingerprints_for_labelset"
@ -42,8 +43,11 @@ const (
hasLabelName = "has_label_name" hasLabelName = "has_label_name"
hasLabelPair = "has_label_pair" hasLabelPair = "has_label_pair"
indexMetric = "index_metric" indexMetric = "index_metric"
rebuildDiskFrontier = "rebuild_disk_frontier"
renderView = "render_view"
setLabelNameFingerprints = "set_label_name_fingerprints" setLabelNameFingerprints = "set_label_name_fingerprints"
setLabelPairFingerprints = "set_label_pair_fingerprints" setLabelPairFingerprints = "set_label_pair_fingerprints"
writeMemory = "write_memory"
) )
var ( var (
@ -53,21 +57,25 @@ var (
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
} }
storageOperations = metrics.NewCounter() storageOperations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram) storageOperationDurations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram)
) )
func recordOutcome(counter metrics.Counter, latency metrics.Histogram, duration time.Duration, err error, success, failure map[string]string) { func recordOutcome(duration time.Duration, err error, success, failure map[string]string) {
labels := success labels := success
if err != nil { if err != nil {
labels = failure labels = failure
} }
counter.Increment(labels) storageOperations.Increment(labels)
latency.Add(labels, float64(duration/time.Microsecond)) asFloat := float64(duration / time.Microsecond)
storageLatency.Add(labels, asFloat)
storageOperationDurations.IncrementBy(labels, asFloat)
} }
func init() { func init() {
registry.Register("prometheus_metric_disk_operations_total", "Total number of metric-related disk operations.", registry.NilLabels, storageOperations) registry.Register("prometheus_metric_disk_operations_total", "Total number of metric-related disk operations.", registry.NilLabels, storageOperations)
registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency) registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency)
registry.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", registry.NilLabels, storageOperationDurations)
} }

View file

@ -32,7 +32,8 @@ import (
) )
var ( var (
_ = fmt.Sprintf("") maximumChunkSize = 200
sortConcurrency = 2
) )
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
@ -189,7 +190,7 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error)
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure}) recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
}() }()
err = l.AppendSamples(model.Samples{sample}) err = l.AppendSamples(model.Samples{sample})
@ -197,17 +198,16 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error)
return return
} }
const (
maximumChunkSize = 200
sortConcurrency = 2
)
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
c := len(samples)
if c > 1 {
fmt.Printf("Appending %d samples...", c)
}
begin := time.Now() begin := time.Now()
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSample, result: failure}) recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
}() }()
// Group the samples by fingerprint. // Group the samples by fingerprint.
@ -474,6 +474,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
Fingerprint: fingerprint.ToDTO(), Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp), Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
} }
value := &dto.SampleValueSeries{} value := &dto.SampleValueSeries{}
@ -497,7 +498,11 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
k = &dto.SampleKey{} k = &dto.SampleKey{}
err = proto.Unmarshal(i.Key(), k) rawKey := i.Key()
if rawKey == nil {
panic("illegal condition; got nil key...")
}
err = proto.Unmarshal(rawKey, k)
return return
} }
@ -549,7 +554,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}() }()
dtoKey := coding.NewProtocolBufferEncoder(dto) dtoKey := coding.NewProtocolBufferEncoder(dto)
@ -564,7 +569,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}() }()
dtoKey := coding.NewProtocolBufferEncoder(dto) dtoKey := coding.NewProtocolBufferEncoder(dto)
@ -579,7 +584,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}() }()
dtoKey := coding.NewProtocolBufferEncoder(dto) dtoKey := coding.NewProtocolBufferEncoder(dto)
@ -594,7 +599,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure}) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
}() }()
sets := []utility.Set{} sets := []utility.Set{}
@ -644,7 +649,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}() }()
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName)))
@ -673,7 +678,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}() }()
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f)))
@ -706,7 +711,7 @@ func (l *LevelDBMetricPersistence) GetBoundaryValues(m model.Metric, i model.Int
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure})
}() }()
// XXX: Maybe we will want to emit incomplete sets? // XXX: Maybe we will want to emit incomplete sets?
@ -755,7 +760,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
}() }()
f := model.NewFingerprintFromMetric(m).ToDTO() f := model.NewFingerprintFromMetric(m).ToDTO()
@ -971,7 +976,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interv
defer func() { defer func() {
duration := time.Now().Sub(begin) duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}() }()
f := model.NewFingerprintFromMetric(m).ToDTO() f := model.NewFingerprintFromMetric(m).ToDTO()

View file

@ -15,7 +15,10 @@ package metric
import ( import (
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"sync" "sync"
"time" "time"
@ -26,7 +29,9 @@ import (
type tieredStorage struct { type tieredStorage struct {
appendToDiskQueue chan model.Sample appendToDiskQueue chan model.Sample
appendToMemoryQueue chan model.Sample appendToMemoryQueue chan model.Sample
diskFrontier *diskFrontier
diskStorage *LevelDBMetricPersistence diskStorage *LevelDBMetricPersistence
draining chan bool
flushMemoryInterval time.Duration flushMemoryInterval time.Duration
memoryArena memorySeriesStorage memoryArena memorySeriesStorage
memoryTTL time.Duration memoryTTL time.Duration
@ -42,11 +47,17 @@ type viewJob struct {
err chan error err chan error
} }
// Provides a unified means for batch appending values into the datastore along
// with querying for values in an efficient way.
type Storage interface { type Storage interface {
AppendSample(model.Sample) // Enqueues a Sample for storage.
MakeView(ViewRequestBuilder, time.Duration) (View, error) AppendSample(model.Sample) error
// Enqueus a ViewRequestBuilder for materialization, subject to a timeout.
MakeView(request ViewRequestBuilder, timeout time.Duration) (View, error)
// Starts serving requests.
Serve() Serve()
Expose() // Stops the storage subsystem, flushing all pending operations.
Drain()
} }
func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration) Storage { func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration) Storage {
@ -59,6 +70,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth), appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth), appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth),
diskStorage: diskStorage, diskStorage: diskStorage,
draining: make(chan bool),
flushMemoryInterval: flushMemoryInterval, flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(), memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL, memoryTTL: memoryTTL,
@ -67,11 +79,26 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
} }
} }
func (t *tieredStorage) AppendSample(s model.Sample) { func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
t.appendToMemoryQueue <- s t.appendToMemoryQueue <- s
return
}
func (t *tieredStorage) Drain() {
t.draining <- true
} }
func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
if len(t.draining) > 0 {
err = fmt.Errorf("Storage is in the process of draining.")
return
}
result := make(chan View) result := make(chan View)
errChan := make(chan error) errChan := make(chan error)
t.viewQueue <- viewJob{ t.viewQueue <- viewJob{
@ -92,39 +119,28 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return return
} }
func (t *tieredStorage) Expose() { func (t *tieredStorage) rebuildDiskFrontier() (err error) {
ticker := time.Tick(5 * time.Second) begin := time.Now()
f := model.NewFingerprintFromRowKey("05232115763668508641-g-97-d") defer func() {
for { duration := time.Now().Sub(begin)
<-ticker
var ( recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
first = time.Now() }()
second = first.Add(1 * time.Minute)
third = first.Add(2 * time.Minute)
)
vrb := NewViewRequestBuilder() i, closer, err := t.diskStorage.metricSamples.GetIterator()
fmt.Printf("vrb -> %s\n", vrb) if closer != nil {
vrb.GetMetricRange(f, first, second) defer closer.Close()
vrb.GetMetricRange(f, first, third)
js := vrb.ScanJobs()
consume(js[0])
// fmt.Printf("js -> %s\n", js)
// js.Represent(t.diskStorage, t.memoryArena)
// i, c, _ := t.diskStorage.metricSamples.GetIterator()
// start := time.Now()
// f, _ := newDiskFrontier(i)
// fmt.Printf("df -> %s\n", time.Since(start))
// fmt.Printf("df -- -> %s\n", f)
// start = time.Now()
// // sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("05232115763668508641-g-97-d"), *f, i)
// // sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("16879485108969112708-g-184-s"), *f, i)
// sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("08437776163162606855-g-169-s"), *f, i)
// fmt.Printf("sf -> %s\n", time.Since(start))
// fmt.Printf("sf -- -> %s\n", sf)
// c.Close()
} }
if err != nil {
panic(err)
}
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
}
return
} }
func (t *tieredStorage) Serve() { func (t *tieredStorage) Serve() {
@ -132,6 +148,7 @@ func (t *tieredStorage) Serve() {
flushMemoryTicker = time.Tick(t.flushMemoryInterval) flushMemoryTicker = time.Tick(t.flushMemoryInterval)
writeMemoryTicker = time.Tick(t.writeMemoryInterval) writeMemoryTicker = time.Tick(t.writeMemoryInterval)
) )
for { for {
select { select {
case <-writeMemoryTicker: case <-writeMemoryTicker:
@ -140,11 +157,21 @@ func (t *tieredStorage) Serve() {
t.flushMemory() t.flushMemory()
case viewRequest := <-t.viewQueue: case viewRequest := <-t.viewQueue:
t.renderView(viewRequest) t.renderView(viewRequest)
case <-t.draining:
t.flush()
return
} }
} }
} }
func (t *tieredStorage) writeMemory() { func (t *tieredStorage) writeMemory() {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure})
}()
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
@ -228,7 +255,7 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe
flusher: f, flusher: f,
} }
fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey()) // fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey())
return visitor, visitor, visitor return visitor, visitor, visitor
} }
@ -239,17 +266,21 @@ func (f *memoryToDiskFlusher) Flush() {
for i := 0; i < length; i++ { for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue) samples = append(samples, <-f.toDiskQueue)
} }
fmt.Printf("%d samples to write\n", length)
f.disk.AppendSamples(samples) f.disk.AppendSamples(samples)
} }
func (f memoryToDiskFlusher) Close() { func (f memoryToDiskFlusher) Close() {
fmt.Println("memory flusher close")
f.Flush() f.Flush()
} }
// Persist a whole bunch of samples to the datastore. // Persist a whole bunch of samples to the datastore.
func (t *tieredStorage) flushMemory() { func (t *tieredStorage) flushMemory() {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
@ -261,57 +292,160 @@ func (t *tieredStorage) flushMemory() {
} }
defer flusher.Close() defer flusher.Close()
v := time.Now()
t.memoryArena.ForEachSample(flusher) t.memoryArena.ForEachSample(flusher)
fmt.Printf("Done flushing memory in %s", time.Since(v))
return return
} }
func (t *tieredStorage) renderView(viewJob viewJob) (err error) { func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: renderView, result: failure})
}()
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
return
}
func consume(s scanJob) {
var ( var (
standingOperations = ops{} scans = viewJob.builder.ScanJobs()
lastTime = time.Time{} // standingOperations = ops{}
// lastTime = time.Time{}
) )
for { // Rebuilding of the frontier should happen on a conditional basis if a
if len(s.operations) == 0 { // (fingerprint, timestamp) tuple is outside of the current frontier.
if len(standingOperations) > 0 { err = t.rebuildDiskFrontier()
var ( if err != nil {
intervals = collectIntervals(standingOperations) panic(err)
ranges = collectRanges(standingOperations) }
)
if len(intervals) > 0 { iterator, closer, err := t.diskStorage.metricSamples.GetIterator()
} if closer != nil {
defer closer.Close()
}
if err != nil {
panic(err)
}
if len(ranges) > 0 { for _, scanJob := range scans {
if len(ranges) > 0 { // XXX: Memoize the last retrieval for forward scans.
var (
standingOperations ops
)
fmt.Printf("Starting scan of %s...\n", scanJob)
// If the fingerprint is outside of the known frontier for the disk, the
// disk won't be queried at this time.
if !(t.diskFrontier == nil || scanJob.fingerprint.Less(t.diskFrontier.firstFingerprint) || t.diskFrontier.lastFingerprint.Less(scanJob.fingerprint)) {
fmt.Printf("Using diskFrontier %s\n", t.diskFrontier)
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
fmt.Printf("Using seriesFrontier %s\n", seriesFrontier)
if err != nil {
panic(err)
}
if seriesFrontier != nil {
for _, operation := range scanJob.operations {
scanJob.operations = scanJob.operations[1:len(scanJob.operations)]
// if operation.StartsAt().Before(seriesFrontier.firstSupertime) {
// fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime)
// continue
// }
// if seriesFrontier.lastTime.Before(operation.StartsAt()) {
// fmt.Printf("operation %s occurs after %s; discarding...\n", operation, seriesFrontier.lastTime)
// continue
// }
var (
targetKey = &dto.SampleKey{}
foundKey = &dto.SampleKey{}
)
targetKey.Fingerprint = scanJob.fingerprint.ToDTO()
targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt())
fmt.Println("target (unencoded) ->", targetKey)
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()
iterator.Seek(rawKey)
foundKey, err = extractSampleKey(iterator)
if err != nil {
panic(err)
}
fmt.Printf("startAt -> %s\n", operation.StartsAt())
fmt.Println("target ->", rawKey)
fmt.Println("found ->", iterator.Key())
fst := indexable.DecodeTime(foundKey.Timestamp)
lst := time.Unix(*foundKey.LastTimestamp, 0)
fmt.Printf("(%s, %s)\n", fst, lst)
fmt.Println(rawKey)
fmt.Println(foundKey)
if !((operation.StartsAt().Before(fst)) || lst.Before(operation.StartsAt())) {
fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey)
} else {
for i := 0; i < 3; i++ {
iterator.Next()
fmt.Println(i)
foundKey, err = extractSampleKey(iterator)
if err != nil {
panic(err)
}
fst = indexable.DecodeTime(foundKey.Timestamp)
lst = time.Unix(*foundKey.LastTimestamp, 0)
fmt.Println("found ->", iterator.Key())
fmt.Printf("(%s, %s)\n", fst, lst)
fmt.Println(foundKey)
}
standingOperations = append(standingOperations, operation)
} }
} }
break
} }
} }
operation := s.operations[0]
if operation.StartsAt().Equal(lastTime) {
standingOperations = append(standingOperations, operation)
} else {
standingOperations = ops{operation}
lastTime = operation.StartsAt()
}
s.operations = s.operations[1:len(s.operations)]
} }
// for {
// if len(s.operations) == 0 {
// if len(standingOperations) > 0 {
// var (
// intervals = collectIntervals(standingOperations)
// ranges = collectRanges(standingOperations)
// )
// if len(intervals) > 0 {
// }
// if len(ranges) > 0 {
// if len(ranges) > 0 {
// }
// }
// break
// }
// }
// operation := s.operations[0]
// if operation.StartsAt().Equal(lastTime) {
// standingOperations = append(standingOperations, operation)
// } else {
// standingOperations = ops{operation}
// lastTime = operation.StartsAt()
// }
// s.operations = s.operations[1:len(s.operations)]
// }
return
} }
func (s scanJobs) Represent(d *LevelDBMetricPersistence, m memorySeriesStorage) (storage *memorySeriesStorage, err error) { func (s scanJobs) Represent(d *LevelDBMetricPersistence, m memorySeriesStorage) (storage *memorySeriesStorage, err error) {