mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Merge pull request #176 from prometheus/optimization/view-materialization/slice-chunking
Truncate irrelevant chunk values.
This commit is contained in:
commit
af7ddc36e2
|
@ -75,6 +75,10 @@ type Metric map[LabelName]LabelValue
|
|||
// time.
|
||||
type SampleValue float64
|
||||
|
||||
func (s SampleValue) Equal(o SampleValue) bool {
|
||||
return s == o
|
||||
}
|
||||
|
||||
func (s SampleValue) ToDTO() *float64 {
|
||||
return proto.Float64(float64(s))
|
||||
}
|
||||
|
@ -92,6 +96,10 @@ type SamplePair struct {
|
|||
Timestamp time.Time
|
||||
}
|
||||
|
||||
func (s SamplePair) Equal(o SamplePair) bool {
|
||||
return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp)
|
||||
}
|
||||
|
||||
type Values []SamplePair
|
||||
|
||||
func (v Values) Len() int {
|
||||
|
@ -136,6 +144,28 @@ func (v Values) InsideInterval(t time.Time) (s bool) {
|
|||
return true
|
||||
}
|
||||
|
||||
// TruncateBefore returns a subslice of the original such that extraneous
|
||||
// samples in the collection that occur before the provided time are
|
||||
// dropped. The original slice is not mutated.
|
||||
func (v Values) TruncateBefore(t time.Time) (values Values) {
|
||||
index := sort.Search(len(v), func(i int) bool {
|
||||
timestamp := v[i].Timestamp
|
||||
|
||||
return !timestamp.Before(t)
|
||||
})
|
||||
|
||||
switch index {
|
||||
case 0:
|
||||
values = v
|
||||
case len(v):
|
||||
values = v[len(v)-1:]
|
||||
default:
|
||||
values = v[index-1:]
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
|
||||
for _, value := range dto.Value {
|
||||
v = append(v, SamplePair{
|
||||
|
|
|
@ -16,6 +16,7 @@ package model
|
|||
import (
|
||||
"github.com/prometheus/prometheus/utility/test"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func testMetric(t test.Tester) {
|
||||
|
@ -77,3 +78,162 @@ func BenchmarkMetric(b *testing.B) {
|
|||
testMetric(b)
|
||||
}
|
||||
}
|
||||
|
||||
func testValues(t test.Tester) {
|
||||
type in struct {
|
||||
values Values
|
||||
time time.Time
|
||||
}
|
||||
instant := time.Now()
|
||||
var scenarios = []struct {
|
||||
in in
|
||||
out Values
|
||||
}{
|
||||
{
|
||||
in: in{
|
||||
time: instant,
|
||||
values: Values{
|
||||
{
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
{
|
||||
Value: 2,
|
||||
Timestamp: instant.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 3,
|
||||
Timestamp: instant.Add(3 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
out: Values{
|
||||
{
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
{
|
||||
Value: 2,
|
||||
Timestamp: instant.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 3,
|
||||
Timestamp: instant.Add(3 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
in: in{
|
||||
time: instant.Add(2 * time.Second),
|
||||
values: Values{
|
||||
{
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
{
|
||||
Value: 2,
|
||||
Timestamp: instant.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 3,
|
||||
Timestamp: instant.Add(3 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
out: Values{
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
{
|
||||
Value: 2,
|
||||
Timestamp: instant.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 3,
|
||||
Timestamp: instant.Add(3 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
in: in{
|
||||
time: instant.Add(5 * time.Second),
|
||||
values: Values{
|
||||
{
|
||||
Value: 0,
|
||||
Timestamp: instant,
|
||||
},
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: instant.Add(time.Second),
|
||||
},
|
||||
{
|
||||
Value: 2,
|
||||
Timestamp: instant.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 3,
|
||||
Timestamp: instant.Add(3 * time.Second),
|
||||
},
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
out: Values{
|
||||
// Preserve the last value in case it needs to be used for the next set.
|
||||
{
|
||||
Value: 4,
|
||||
Timestamp: instant.Add(4 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
actual := scenario.in.values.TruncateBefore(scenario.in.time)
|
||||
|
||||
if len(actual) != len(scenario.out) {
|
||||
t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual))
|
||||
}
|
||||
|
||||
for j, actualValue := range actual {
|
||||
if !actualValue.Equal(scenario.out[j]) {
|
||||
t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValues(t *testing.T) {
|
||||
testValues(t)
|
||||
}
|
||||
|
|
|
@ -417,6 +417,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
break
|
||||
}
|
||||
|
||||
chunk = chunk.TruncateBefore(targetTime)
|
||||
|
||||
lastChunkTime := chunk[len(chunk)-1].Timestamp
|
||||
if lastChunkTime.After(targetTime) {
|
||||
targetTime = lastChunkTime
|
||||
|
@ -428,6 +430,9 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
if op.CurrentTime().After(targetTime) {
|
||||
break
|
||||
}
|
||||
|
||||
chunk = chunk.TruncateBefore(*(op.CurrentTime()))
|
||||
|
||||
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
|
||||
out = op.ExtractSamples(chunk)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue