From f39b9c3c8e1efca8efb9a6b0f00ae863b51ec7c8 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 1 Mar 2013 09:51:36 -0800 Subject: [PATCH] Checkpoint. --- coding/protocol_buffer.go | 11 +- main.go | 29 +++- model/data.proto | 9 +- storage/metric/frontier.go | 18 +- storage/metric/instrumentation.go | 18 +- storage/metric/leveldb.go | 41 +++-- storage/metric/tiered.go | 268 ++++++++++++++++++++++-------- 7 files changed, 284 insertions(+), 110 deletions(-) diff --git a/coding/protocol_buffer.go b/coding/protocol_buffer.go index 8ad93424df..273395207e 100644 --- a/coding/protocol_buffer.go +++ b/coding/protocol_buffer.go @@ -21,8 +21,15 @@ type ProtocolBufferEncoder struct { message proto.Message } -func (p *ProtocolBufferEncoder) Encode() ([]byte, error) { - return proto.Marshal(p.message) +func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) { + raw, err = proto.Marshal(p.message) + + // XXX: Adjust legacy users of this to not check for error. + if err != nil { + panic(err) + } + + return } func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder { diff --git a/main.go b/main.go index 0e6f1b73f9..50f90c55ad 100644 --- a/main.go +++ b/main.go @@ -15,8 +15,10 @@ package main import ( "flag" + "fmt" "github.com/prometheus/prometheus/appstate" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval/format" "github.com/prometheus/prometheus/rules" @@ -31,6 +33,8 @@ import ( // Commandline flags. var ( + _ = fmt.Sprintf("") + configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.") scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") @@ -92,20 +96,39 @@ func main() { ts := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20) go ts.Serve() - go ts.Expose() + + go func() { + ticker := time.Tick(time.Second) + for i := 0; i < 5; i++ { + <-ticker + if i%10 == 0 { + fmt.Printf(".") + } + } + fmt.Println() + //f := model.NewFingerprintFromRowKey("9776005627788788740-g-131-0") + f := model.NewFingerprintFromRowKey("09923616460706181007-g-131-0") + v := metric.NewViewRequestBuilder() + v.GetMetricAtTime(f, time.Now().Add(-30*time.Second)) + + view, err := ts.MakeView(v, time.Minute) + fmt.Println(view, err) + }() for { select { case scrapeResult := <-scrapeResults: if scrapeResult.Err == nil { - persistence.AppendSample(scrapeResult.Sample) + // f := model.NewFingerprintFromMetric(scrapeResult.Sample.Metric) + // fmt.Println(f) + // persistence.AppendSample(scrapeResult.Sample) ts.AppendSample(scrapeResult.Sample) } case ruleResult := <-ruleResults: for _, sample := range ruleResult.Samples { // XXX: Wart - persistence.AppendSample(*sample) + // persistence.AppendSample(*sample) ts.AppendSample(*sample) } } diff --git a/model/data.proto b/model/data.proto index 03b096d051..7b8c97514d 100644 --- a/model/data.proto +++ b/model/data.proto @@ -39,15 +39,16 @@ message LabelSet { } message SampleKey { - optional Fingerprint fingerprint = 1; - optional bytes timestamp = 2; - optional int64 last_timestamp = 3; + optional Fingerprint fingerprint = 1; + optional bytes timestamp = 2; + optional int64 last_timestamp = 3; + optional uint32 sample_count = 4; } message SampleValueSeries { message Value { optional int64 timestamp = 1; - optional float value = 2; + optional float value = 2; } repeated Value value = 1; } diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index 9f1231f224..a1e4b579c5 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -39,17 +39,13 @@ func (f *diskFrontier) String() string { } func newDiskFrontier(i iterator) (d *diskFrontier, err error) { - if err != nil { - return - } - i.SeekToLast() if i.Key() == nil { return } lastKey, err := extractSampleKey(i) if err != nil { - return + panic(err) } i.SeekToFirst() @@ -58,7 +54,7 @@ func newDiskFrontier(i iterator) (d *diskFrontier, err error) { return } if err != nil { - return + panic(err) } d = &diskFrontier{} @@ -107,7 +103,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri raw, err := coding.NewProtocolBufferEncoder(key).Encode() if err != nil { - return + panic(err) } i.Seek(raw) @@ -117,7 +113,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri retrievedKey, err := extractSampleKey(i) if err != nil { - return + panic(err) } retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) @@ -133,7 +129,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri retrievedKey, err = extractSampleKey(i) if err != nil { - return + panic(err) } retrievedFingerprint := model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) // If the previous key does not match, we know that the requested @@ -152,14 +148,14 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri raw, err = coding.NewProtocolBufferEncoder(key).Encode() if err != nil { - return + panic(err) } i.Seek(raw) retrievedKey, err = extractSampleKey(i) if err != nil { - return + panic(err) } retrievedFingerprint = model.NewFingerprintFromRowKey(*retrievedKey.Fingerprint.Signature) diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index d06d6c3d51..2abd3a7511 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -31,6 +31,7 @@ const ( appendLabelPairFingerprint = "append_label_pair_fingerprint" appendSample = "append_sample" appendSamples = "append_samples" + flushMemory = "flush_memory" getBoundaryValues = "get_boundary_values" getFingerprintsForLabelName = "get_fingerprints_for_label_name" getFingerprintsForLabelSet = "get_fingerprints_for_labelset" @@ -42,8 +43,11 @@ const ( hasLabelName = "has_label_name" hasLabelPair = "has_label_pair" indexMetric = "index_metric" + rebuildDiskFrontier = "rebuild_disk_frontier" + renderView = "render_view" setLabelNameFingerprints = "set_label_name_fingerprints" setLabelPairFingerprints = "set_label_pair_fingerprints" + writeMemory = "write_memory" ) var ( @@ -53,21 +57,25 @@ var ( ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, } - storageOperations = metrics.NewCounter() - storageLatency = metrics.NewHistogram(diskLatencyHistogram) + storageOperations = metrics.NewCounter() + storageOperationDurations = metrics.NewCounter() + storageLatency = metrics.NewHistogram(diskLatencyHistogram) ) -func recordOutcome(counter metrics.Counter, latency metrics.Histogram, duration time.Duration, err error, success, failure map[string]string) { +func recordOutcome(duration time.Duration, err error, success, failure map[string]string) { labels := success if err != nil { labels = failure } - counter.Increment(labels) - latency.Add(labels, float64(duration/time.Microsecond)) + storageOperations.Increment(labels) + asFloat := float64(duration / time.Microsecond) + storageLatency.Add(labels, asFloat) + storageOperationDurations.IncrementBy(labels, asFloat) } func init() { registry.Register("prometheus_metric_disk_operations_total", "Total number of metric-related disk operations.", registry.NilLabels, storageOperations) registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency) + registry.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", registry.NilLabels, storageOperationDurations) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index ff688a79d3..6956bbae82 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -32,7 +32,8 @@ import ( ) var ( - _ = fmt.Sprintf("") + maximumChunkSize = 200 + sortConcurrency = 2 ) type LevelDBMetricPersistence struct { @@ -189,7 +190,7 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure}) + recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure}) }() err = l.AppendSamples(model.Samples{sample}) @@ -197,17 +198,16 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) return } -const ( - maximumChunkSize = 200 - sortConcurrency = 2 -) - func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { + c := len(samples) + if c > 1 { + fmt.Printf("Appending %d samples...", c) + } begin := time.Now() defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSample, result: failure}) + recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) }() // Group the samples by fingerprint. @@ -474,6 +474,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err Fingerprint: fingerprint.ToDTO(), Timestamp: indexable.EncodeTime(chunk[0].Timestamp), LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()), + SampleCount: proto.Uint32(uint32(take)), } value := &dto.SampleValueSeries{} @@ -497,7 +498,11 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { k = &dto.SampleKey{} - err = proto.Unmarshal(i.Key(), k) + rawKey := i.Key() + if rawKey == nil { + panic("illegal condition; got nil key...") + } + err = proto.Unmarshal(rawKey, k) return } @@ -549,7 +554,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) + recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() dtoKey := coding.NewProtocolBufferEncoder(dto) @@ -564,7 +569,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) + recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() dtoKey := coding.NewProtocolBufferEncoder(dto) @@ -579,7 +584,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) + recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() dtoKey := coding.NewProtocolBufferEncoder(dto) @@ -594,7 +599,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure}) }() sets := []utility.Set{} @@ -644,7 +649,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName))) @@ -673,7 +678,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) @@ -706,7 +711,7 @@ func (l *LevelDBMetricPersistence) GetBoundaryValues(m model.Metric, i model.Int defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) }() // XXX: Maybe we will want to emit incomplete sets? @@ -755,7 +760,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) }() f := model.NewFingerprintFromMetric(m).ToDTO() @@ -971,7 +976,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interv defer func() { duration := time.Now().Sub(begin) - recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) + recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) }() f := model.NewFingerprintFromMetric(m).ToDTO() diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 362e22c4ed..2d52c5949b 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -15,7 +15,10 @@ package metric import ( "fmt" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" "sync" "time" @@ -26,7 +29,9 @@ import ( type tieredStorage struct { appendToDiskQueue chan model.Sample appendToMemoryQueue chan model.Sample + diskFrontier *diskFrontier diskStorage *LevelDBMetricPersistence + draining chan bool flushMemoryInterval time.Duration memoryArena memorySeriesStorage memoryTTL time.Duration @@ -42,11 +47,17 @@ type viewJob struct { err chan error } +// Provides a unified means for batch appending values into the datastore along +// with querying for values in an efficient way. type Storage interface { - AppendSample(model.Sample) - MakeView(ViewRequestBuilder, time.Duration) (View, error) + // Enqueues a Sample for storage. + AppendSample(model.Sample) error + // Enqueus a ViewRequestBuilder for materialization, subject to a timeout. + MakeView(request ViewRequestBuilder, timeout time.Duration) (View, error) + // Starts serving requests. Serve() - Expose() + // Stops the storage subsystem, flushing all pending operations. + Drain() } func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration) Storage { @@ -59,6 +70,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth), appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth), diskStorage: diskStorage, + draining: make(chan bool), flushMemoryInterval: flushMemoryInterval, memoryArena: NewMemorySeriesStorage(), memoryTTL: memoryTTL, @@ -67,11 +79,26 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu } } -func (t *tieredStorage) AppendSample(s model.Sample) { +func (t *tieredStorage) AppendSample(s model.Sample) (err error) { + if len(t.draining) > 0 { + return fmt.Errorf("Storage is in the process of draining.") + } + t.appendToMemoryQueue <- s + + return +} + +func (t *tieredStorage) Drain() { + t.draining <- true } func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { + if len(t.draining) > 0 { + err = fmt.Errorf("Storage is in the process of draining.") + return + } + result := make(chan View) errChan := make(chan error) t.viewQueue <- viewJob{ @@ -92,39 +119,28 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat return } -func (t *tieredStorage) Expose() { - ticker := time.Tick(5 * time.Second) - f := model.NewFingerprintFromRowKey("05232115763668508641-g-97-d") - for { - <-ticker +func (t *tieredStorage) rebuildDiskFrontier() (err error) { + begin := time.Now() + defer func() { + duration := time.Now().Sub(begin) - var ( - first = time.Now() - second = first.Add(1 * time.Minute) - third = first.Add(2 * time.Minute) - ) + recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) + }() - vrb := NewViewRequestBuilder() - fmt.Printf("vrb -> %s\n", vrb) - vrb.GetMetricRange(f, first, second) - vrb.GetMetricRange(f, first, third) - js := vrb.ScanJobs() - consume(js[0]) - // fmt.Printf("js -> %s\n", js) - // js.Represent(t.diskStorage, t.memoryArena) - // i, c, _ := t.diskStorage.metricSamples.GetIterator() - // start := time.Now() - // f, _ := newDiskFrontier(i) - // fmt.Printf("df -> %s\n", time.Since(start)) - // fmt.Printf("df -- -> %s\n", f) - // start = time.Now() - // // sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("05232115763668508641-g-97-d"), *f, i) - // // sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("16879485108969112708-g-184-s"), *f, i) - // sf, _ := newSeriesFrontier(model.NewFingerprintFromRowKey("08437776163162606855-g-169-s"), *f, i) - // fmt.Printf("sf -> %s\n", time.Since(start)) - // fmt.Printf("sf -- -> %s\n", sf) - // c.Close() + i, closer, err := t.diskStorage.metricSamples.GetIterator() + if closer != nil { + defer closer.Close() } + if err != nil { + panic(err) + } + + t.diskFrontier, err = newDiskFrontier(i) + if err != nil { + panic(err) + } + + return } func (t *tieredStorage) Serve() { @@ -132,6 +148,7 @@ func (t *tieredStorage) Serve() { flushMemoryTicker = time.Tick(t.flushMemoryInterval) writeMemoryTicker = time.Tick(t.writeMemoryInterval) ) + for { select { case <-writeMemoryTicker: @@ -140,11 +157,21 @@ func (t *tieredStorage) Serve() { t.flushMemory() case viewRequest := <-t.viewQueue: t.renderView(viewRequest) + case <-t.draining: + t.flush() + return } } } func (t *tieredStorage) writeMemory() { + begin := time.Now() + defer func() { + duration := time.Now().Sub(begin) + + recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure}) + }() + t.mutex.Lock() defer t.mutex.Unlock() @@ -228,7 +255,7 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe flusher: f, } - fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey()) + // fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey()) return visitor, visitor, visitor } @@ -239,17 +266,21 @@ func (f *memoryToDiskFlusher) Flush() { for i := 0; i < length; i++ { samples = append(samples, <-f.toDiskQueue) } - fmt.Printf("%d samples to write\n", length) f.disk.AppendSamples(samples) } func (f memoryToDiskFlusher) Close() { - fmt.Println("memory flusher close") f.Flush() } // Persist a whole bunch of samples to the datastore. func (t *tieredStorage) flushMemory() { + begin := time.Now() + defer func() { + duration := time.Now().Sub(begin) + + recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure}) + }() t.mutex.Lock() defer t.mutex.Unlock() @@ -261,57 +292,160 @@ func (t *tieredStorage) flushMemory() { } defer flusher.Close() - v := time.Now() t.memoryArena.ForEachSample(flusher) - fmt.Printf("Done flushing memory in %s", time.Since(v)) return } func (t *tieredStorage) renderView(viewJob viewJob) (err error) { + begin := time.Now() + defer func() { + duration := time.Now().Sub(begin) + + recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: renderView, result: failure}) + }() + t.mutex.Lock() defer t.mutex.Unlock() - return -} - - -func consume(s scanJob) { var ( - standingOperations = ops{} - lastTime = time.Time{} + scans = viewJob.builder.ScanJobs() + // standingOperations = ops{} + // lastTime = time.Time{} ) - for { - if len(s.operations) == 0 { - if len(standingOperations) > 0 { - var ( - intervals = collectIntervals(standingOperations) - ranges = collectRanges(standingOperations) - ) + // Rebuilding of the frontier should happen on a conditional basis if a + // (fingerprint, timestamp) tuple is outside of the current frontier. + err = t.rebuildDiskFrontier() + if err != nil { + panic(err) + } - if len(intervals) > 0 { - } + iterator, closer, err := t.diskStorage.metricSamples.GetIterator() + if closer != nil { + defer closer.Close() + } + if err != nil { + panic(err) + } - if len(ranges) > 0 { - if len(ranges) > 0 { + for _, scanJob := range scans { + // XXX: Memoize the last retrieval for forward scans. + var ( + standingOperations ops + ) + fmt.Printf("Starting scan of %s...\n", scanJob) + // If the fingerprint is outside of the known frontier for the disk, the + // disk won't be queried at this time. + if !(t.diskFrontier == nil || scanJob.fingerprint.Less(t.diskFrontier.firstFingerprint) || t.diskFrontier.lastFingerprint.Less(scanJob.fingerprint)) { + fmt.Printf("Using diskFrontier %s\n", t.diskFrontier) + seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) + fmt.Printf("Using seriesFrontier %s\n", seriesFrontier) + if err != nil { + panic(err) + } + + if seriesFrontier != nil { + for _, operation := range scanJob.operations { + scanJob.operations = scanJob.operations[1:len(scanJob.operations)] + + // if operation.StartsAt().Before(seriesFrontier.firstSupertime) { + // fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime) + // continue + // } + + // if seriesFrontier.lastTime.Before(operation.StartsAt()) { + // fmt.Printf("operation %s occurs after %s; discarding...\n", operation, seriesFrontier.lastTime) + // continue + // } + + var ( + targetKey = &dto.SampleKey{} + foundKey = &dto.SampleKey{} + ) + + targetKey.Fingerprint = scanJob.fingerprint.ToDTO() + targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt()) + + fmt.Println("target (unencoded) ->", targetKey) + rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode() + + iterator.Seek(rawKey) + + foundKey, err = extractSampleKey(iterator) + if err != nil { + panic(err) + } + + fmt.Printf("startAt -> %s\n", operation.StartsAt()) + fmt.Println("target ->", rawKey) + fmt.Println("found ->", iterator.Key()) + fst := indexable.DecodeTime(foundKey.Timestamp) + lst := time.Unix(*foundKey.LastTimestamp, 0) + fmt.Printf("(%s, %s)\n", fst, lst) + fmt.Println(rawKey) + fmt.Println(foundKey) + + if !((operation.StartsAt().Before(fst)) || lst.Before(operation.StartsAt())) { + fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey) + } else { + for i := 0; i < 3; i++ { + iterator.Next() + + fmt.Println(i) + foundKey, err = extractSampleKey(iterator) + if err != nil { + panic(err) + } + + fst = indexable.DecodeTime(foundKey.Timestamp) + lst = time.Unix(*foundKey.LastTimestamp, 0) + fmt.Println("found ->", iterator.Key()) + fmt.Printf("(%s, %s)\n", fst, lst) + fmt.Println(foundKey) + } + + standingOperations = append(standingOperations, operation) } } - break } } - operation := s.operations[0] - if operation.StartsAt().Equal(lastTime) { - standingOperations = append(standingOperations, operation) - } else { - standingOperations = ops{operation} - lastTime = operation.StartsAt() - } - - s.operations = s.operations[1:len(s.operations)] } + + // for { + // if len(s.operations) == 0 { + // if len(standingOperations) > 0 { + // var ( + // intervals = collectIntervals(standingOperations) + // ranges = collectRanges(standingOperations) + // ) + + // if len(intervals) > 0 { + // } + + // if len(ranges) > 0 { + // if len(ranges) > 0 { + + // } + // } + // break + // } + // } + + // operation := s.operations[0] + // if operation.StartsAt().Equal(lastTime) { + // standingOperations = append(standingOperations, operation) + // } else { + // standingOperations = ops{operation} + // lastTime = operation.StartsAt() + // } + + // s.operations = s.operations[1:len(s.operations)] + // } + + return } func (s scanJobs) Represent(d *LevelDBMetricPersistence, m memorySeriesStorage) (storage *memorySeriesStorage, err error) {