mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Checkpoint.
This commit is contained in:
parent
8cc5cdde0b
commit
34a921e16d
|
@ -498,6 +498,10 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
}
|
||||
|
||||
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
|
||||
if i == nil {
|
||||
panic("nil iterator")
|
||||
}
|
||||
|
||||
k = &dto.SampleKey{}
|
||||
rawKey := i.Key()
|
||||
if rawKey == nil {
|
||||
|
@ -509,6 +513,10 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
|
|||
}
|
||||
|
||||
func extractSampleValue(i iterator) (v *dto.SampleValueSeries, err error) {
|
||||
if i == nil {
|
||||
panic("nil iterator")
|
||||
}
|
||||
|
||||
v = &dto.SampleValueSeries{}
|
||||
err = proto.Unmarshal(i.Value(), v)
|
||||
|
||||
|
|
|
@ -124,7 +124,6 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
|
|||
}
|
||||
|
||||
func (t *tieredStorage) rebuildDiskFrontier() (err error) {
|
||||
fmt.Println("a1")
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
duration := time.Now().Sub(begin)
|
||||
|
@ -354,11 +353,11 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
|
|||
// standingOperations ops
|
||||
)
|
||||
|
||||
fmt.Printf("Starting scan of %s...\n", scanJob)
|
||||
// fmt.Printf("Starting scan of %s...\n", scanJob)
|
||||
if !(t.diskFrontier == nil || scanJob.fingerprint.Less(t.diskFrontier.firstFingerprint) || t.diskFrontier.lastFingerprint.Less(scanJob.fingerprint)) {
|
||||
fmt.Printf("Using diskFrontier %s\n", t.diskFrontier)
|
||||
// fmt.Printf("Using diskFrontier %s\n", t.diskFrontier)
|
||||
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
|
||||
fmt.Printf("Using seriesFrontier %s\n", seriesFrontier)
|
||||
// fmt.Printf("Using seriesFrontier %s\n", seriesFrontier)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -372,14 +371,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
|
|||
|
||||
for _, operation := range scanJob.operations {
|
||||
if seriesFrontier.lastTime.Before(operation.StartsAt()) {
|
||||
fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime)
|
||||
// fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime)
|
||||
break
|
||||
}
|
||||
|
||||
scanJob.operations = scanJob.operations[1:len(scanJob.operations)]
|
||||
|
||||
if operation.StartsAt().Before(seriesFrontier.firstSupertime) {
|
||||
fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime)
|
||||
// fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -388,6 +387,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
|
|||
|
||||
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()
|
||||
|
||||
// XXX: Use frontiers to manage out of range queries.
|
||||
iterator.Seek(rawKey)
|
||||
|
||||
foundKey, err = extractSampleKey(iterator)
|
||||
|
@ -401,13 +401,11 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
|
|||
)
|
||||
|
||||
if !((operation.StartsAt().Before(fst)) || lst.Before(operation.StartsAt())) {
|
||||
fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey)
|
||||
// fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey)
|
||||
foundValue, err = extractSampleValue(iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Printf("f -> %s\n", foundValue)
|
||||
} else if operation.StartsAt().Before(fst) {
|
||||
fmt.Printf("operation %s may occur in next entity; fast forwarding...\n", operation)
|
||||
panic("oops")
|
||||
|
@ -423,14 +421,20 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
|
|||
index = sort.Search(elementCount, searcher)
|
||||
)
|
||||
|
||||
foundValue.Value = foundValue.Value[index:elementCount]
|
||||
if index != elementCount {
|
||||
if index > 0 {
|
||||
index--
|
||||
}
|
||||
|
||||
foundValue.Value = foundValue.Value[index:elementCount]
|
||||
}
|
||||
switch operation.(type) {
|
||||
case getValuesAtTimeOp:
|
||||
if len(foundValue.Value) > 0 {
|
||||
view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[0].Timestamp, 0), model.SampleValue(*foundValue.Value[0].Value))
|
||||
}
|
||||
if len(foundValue.Value) > 1 {
|
||||
view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[1].Timestamp, 0), model.SampleValue(*foundValue.Value[1].Value))
|
||||
}
|
||||
default:
|
||||
panic("unhandled")
|
||||
|
|
225
storage/metric/tiered_test.go
Normal file
225
storage/metric/tiered_test.go
Normal file
|
@ -0,0 +1,225 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"github.com/prometheus/prometheus/utility/test"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func testMakeView(t test.Tester) {
|
||||
type in struct {
|
||||
atTime []getValuesAtTimeOp
|
||||
atInterval []getValuesAtIntervalOp
|
||||
alongRange []getValuesAlongRangeOp
|
||||
}
|
||||
|
||||
type out struct {
|
||||
atTime [][]model.SamplePair
|
||||
atInterval [][]model.SamplePair
|
||||
alongRange [][]model.SamplePair
|
||||
}
|
||||
var (
|
||||
instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC)
|
||||
metric = model.Metric{"name": "request_count"}
|
||||
fingerprint = model.NewFingerprintFromMetric(metric)
|
||||
scenarios = []struct {
|
||||
data []model.Sample
|
||||
in in
|
||||
out out
|
||||
}{
|
||||
{
|
||||
data: []model.Sample{
|
||||
{
|
||||
Metric: metric,
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
},
|
||||
in: in{
|
||||
atTime: []getValuesAtTimeOp{
|
||||
{
|
||||
time: instant,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
data: []model.Sample{
|
||||
{
|
||||
Metric: metric,
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
{
|
||||
Metric: metric,
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
},
|
||||
in: in{
|
||||
atTime: []getValuesAtTimeOp{
|
||||
{
|
||||
time: instant,
|
||||
},
|
||||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Timestamp: instant.Add(time.Second),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// {
|
||||
// data: []model.Sample{
|
||||
// {
|
||||
// Metric: metric,
|
||||
// Value: 0,
|
||||
// Timestamp: instant,
|
||||
// },
|
||||
// {
|
||||
// Metric: metric,
|
||||
// Value: 1,
|
||||
// Timestamp: instant.Add(time.Second),
|
||||
// },
|
||||
// {
|
||||
// Metric: metric,
|
||||
// Value: 2,
|
||||
// Timestamp: instant.Add(time.Second * 2),
|
||||
// },
|
||||
// },
|
||||
// in: in{
|
||||
// atTime: []getValuesAtTimeOp{
|
||||
// {
|
||||
// time: instant.Add(time.Second),
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// out: out{
|
||||
// atTime: [][]model.SamplePair{
|
||||
// {
|
||||
// {
|
||||
// Timestamp: instant.Add(time.Second),
|
||||
// Value: 1,
|
||||
// },
|
||||
// {
|
||||
// Timestamp: instant.Add(time.Second * 2),
|
||||
// Value: 2,
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
}
|
||||
)
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
var (
|
||||
temporary, _ = ioutil.TempDir("", "test_make_view")
|
||||
tiered = NewTieredStorage(100, 100, 100, time.Second, time.Second, 0*time.Second, temporary)
|
||||
)
|
||||
|
||||
if tiered == nil {
|
||||
t.Fatalf("%d. tiered == nil", i)
|
||||
}
|
||||
|
||||
go tiered.Serve()
|
||||
defer tiered.Drain()
|
||||
|
||||
defer func() {
|
||||
os.RemoveAll(temporary)
|
||||
}()
|
||||
|
||||
for j, datum := range scenario.data {
|
||||
err := tiered.AppendSample(datum)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err)
|
||||
}
|
||||
}
|
||||
|
||||
tiered.Flush()
|
||||
|
||||
requestBuilder := NewViewRequestBuilder()
|
||||
|
||||
for _, atTime := range scenario.in.atTime {
|
||||
requestBuilder.GetMetricAtTime(fingerprint, atTime.time)
|
||||
}
|
||||
|
||||
for _, atInterval := range scenario.in.atInterval {
|
||||
requestBuilder.GetMetricAtInterval(fingerprint, atInterval.from, atInterval.through, atInterval.interval)
|
||||
}
|
||||
|
||||
for _, alongRange := range scenario.in.alongRange {
|
||||
requestBuilder.GetMetricRange(fingerprint, alongRange.from, alongRange.through)
|
||||
}
|
||||
|
||||
v, err := tiered.MakeView(requestBuilder, time.Second*5)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("%d. failed due to %s", i, err)
|
||||
}
|
||||
|
||||
for j, atTime := range scenario.in.atTime {
|
||||
actual := v.GetValueAtTime(fingerprint, atTime.time)
|
||||
|
||||
if len(actual) != len(scenario.out.atTime[j]) {
|
||||
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual))
|
||||
}
|
||||
|
||||
for k, value := range scenario.out.atTime[j] {
|
||||
if value.Value != actual[k].Value {
|
||||
t.Fatalf("%d.%d.%d expected %d value, got %d", i, j, k, value.Value, actual[j].Value)
|
||||
}
|
||||
if !value.Timestamp.Equal(actual[k].Timestamp) {
|
||||
t.Fatalf("%d.%d.%d expected %s timestamp, got %s", i, j, k, value.Timestamp, actual[j].Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tiered.Drain()
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeView(t *testing.T) {
|
||||
testMakeView(t)
|
||||
}
|
||||
|
||||
func BenchmarkMakeView(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testMakeView(b)
|
||||
}
|
||||
}
|
|
@ -148,13 +148,13 @@ func (v view) GetValueAtTime(f model.Fingerprint, t time.Time) (s []model.Sample
|
|||
|
||||
s = append(s, model.SamplePair{
|
||||
Timestamp: time.Time(iterator.Key().(skipListTime)),
|
||||
Value: iterator.Value().(model.SampleValue),
|
||||
Value: iterator.Value().(value).get(),
|
||||
})
|
||||
|
||||
if iterator.Next() {
|
||||
if iterator.Previous() {
|
||||
s = append(s, model.SamplePair{
|
||||
Timestamp: time.Time(iterator.Key().(skipListTime)),
|
||||
Value: iterator.Value().(model.SampleValue),
|
||||
Value: iterator.Value().(value).get(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue