Add regression tests for 'loop until op is consumed' bug.

- Most of this is the actual regression test in tiered_test.go.

- Working on that regression tests uncovered problems in
  tiered_test.go that are fixed in this commit.

- The 'op.consumed = false' line added to freelist.go was actually not
  fixing a bug. Instead, there was no bug at all. So this commit
  removes that line again, but adds a regression test to make sure
  that the assumed bug is indeed not there (cf. freelist_test.go).

- Removed more code duplication in operation.go (following the same
  approach as before, i.e. embedding op type A into op type B if
  everything in A is the same as in B with the exception of String()
  and ExtractSample()). (This change make struct literals for ops more
  clunky, but that only affects tests. No code change whatsoever was
  necessary in the actual code after this refactoring.)

- Fix another op leak in tiered.go.

Change-Id: Ia165c52e33290ad4f6aba9c83d92318d4f583517
This commit is contained in:
Bjoern Rabenstein 2014-03-10 15:04:42 +01:00
parent b470fb0672
commit c3b282bd14
6 changed files with 282 additions and 83 deletions

View file

@ -94,7 +94,6 @@ func (l *valueAtTimeList) Get(fp *clientmodel.Fingerprint, time clientmodel.Time
}
op.fp = *fp
op.current = time
op.consumed = false
return op
}

View file

@ -0,0 +1,37 @@
// Copyright 2014 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"testing"
"github.com/prometheus/client_golang/model"
)
// TestValueAtTimeListGet tests if the timestamp is set properly in the op
// retrieved from the free list and if the 'consumed' member is zeroed properly.
func TestValueAtTimeListGet(t *testing.T) {
l := newValueAtTimeList(1)
op := l.Get(&model.Fingerprint{}, 42)
op.consumed = true
l.Give(op)
op2 := l.Get(&model.Fingerprint{}, 4711)
if op2.Consumed() {
t.Error("Op retrieved from freelist is already consumed.")
}
if got, expected := op2.CurrentTime(), model.Timestamp(4711); got != expected {
t.Errorf("op2.CurrentTime() = %d; want %d.", got, expected)
}
}

View file

@ -137,50 +137,6 @@ func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed
}
// getValuesAtIntervalOp encapsulates getting values at a given interval over a
// duration.
type getValuesAtIntervalOp struct {
baseOp
through clientmodel.Timestamp
interval time.Duration
}
func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.current, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) Through() clientmodel.Timestamp {
return g.through
}
func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
lastChunkTime := in[len(in)-1].Timestamp
for len(in) > 0 {
out = append(out, extractValuesAroundTime(g.current, in)...)
lastExtractedTime := out[len(out)-1].Timestamp
in = in.TruncateBefore(lastExtractedTime.Add(
clientmodel.MinimumTick))
g.current = g.current.Add(g.interval)
for !g.current.After(lastExtractedTime) {
g.current = g.current.Add(g.interval)
}
if lastExtractedTime.Equal(lastChunkTime) {
break
}
if g.current.After(g.through) {
break
}
}
return
}
func (g *getValuesAtIntervalOp) Consumed() bool {
return g.current.After(g.through)
}
// getValuesAlongRangeOp encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct {
baseOp
@ -232,6 +188,41 @@ func (g *getValuesAlongRangeOp) Consumed() bool {
return g.current.After(g.through)
}
// getValuesAtIntervalOp encapsulates getting values at a given interval over a
// duration.
type getValuesAtIntervalOp struct {
getValuesAlongRangeOp
interval time.Duration
}
func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.current, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
if len(in) == 0 {
return
}
lastChunkTime := in[len(in)-1].Timestamp
for len(in) > 0 {
out = append(out, extractValuesAroundTime(g.current, in)...)
lastExtractedTime := out[len(out)-1].Timestamp
in = in.TruncateBefore(lastExtractedTime.Add(
clientmodel.MinimumTick))
g.current = g.current.Add(g.interval)
for !g.current.After(lastExtractedTime) {
g.current = g.current.Add(g.interval)
}
if lastExtractedTime.Equal(lastChunkTime) {
break
}
if g.current.After(g.through) {
break
}
}
return
}
// getValueRangeAtIntervalOp encapsulates getting all values from ranges along
// intervals.
//
@ -239,17 +230,16 @@ func (g *getValuesAlongRangeOp) Consumed() bool {
// incremented by interval and from is reset to through-rangeDuration. Returns
// current time nil when from > totalThrough.
type getValueRangeAtIntervalOp struct {
baseOp
getValuesAtIntervalOp
rangeThrough clientmodel.Timestamp
rangeDuration time.Duration
interval time.Duration
through clientmodel.Timestamp
}
func (g *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.current, g.interval, g.through)
}
// Through panics because the notion of 'through' is ambiguous for this op.
func (g *getValueRangeAtIntervalOp) Through() clientmodel.Timestamp {
panic("not implemented")
}
@ -297,10 +287,6 @@ func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
return in[firstIdx:lastIdx]
}
func (g *getValueRangeAtIntervalOp) Consumed() bool {
return g.current.After(g.through)
}
// getValuesAtIntervalOps contains getValuesAtIntervalOp operations. It
// implements sort.Interface and sorts the operations in ascending order by
// their frequency.

View file

@ -227,16 +227,20 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// No values.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(1 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(1 * time.Minute),
},
interval: 30 * time.Second,
},
},
// Entire operator range before first value.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(1 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(1 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -259,8 +263,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// Operator range starts before first value, ends within available values.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(2 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(2 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -287,8 +293,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// Entire operator range is within available values.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant.Add(1 * time.Minute)},
through: testInstant.Add(2 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant.Add(1 * time.Minute)},
through: testInstant.Add(2 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -319,8 +327,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// Operator range begins before first value, ends after last.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(3 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant},
through: testInstant.Add(3 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -347,8 +357,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// Operator range begins within available values, ends after the last value.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(4 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(4 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -383,8 +395,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// Entire operator range after the last available value.
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(3 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(3 * time.Minute),
},
interval: 30 * time.Second,
},
in: Values{
@ -411,8 +425,10 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
// skip over values for the test).
{
op: getValuesAtIntervalOp{
baseOp: baseOp{current: testInstant.Add(30 * time.Second)},
through: testInstant.Add(4 * time.Minute),
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant.Add(30 * time.Second)},
through: testInstant.Add(4 * time.Minute),
},
interval: 3 * time.Minute,
},
in: Values{
@ -648,11 +664,15 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
func TestGetValueRangeAtIntervalOp(t *testing.T) {
testOp := getValueRangeAtIntervalOp{
baseOp: baseOp{current: testInstant.Add(-2 * time.Minute)},
getValuesAtIntervalOp: getValuesAtIntervalOp{
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: testInstant.Add(-2 * time.Minute)},
through: testInstant.Add(20 * time.Minute),
},
interval: 10 * time.Minute,
},
rangeThrough: testInstant,
rangeDuration: 2 * time.Minute,
interval: 10 * time.Minute,
through: testInstant.Add(20 * time.Minute),
}
var scenarios = []struct {

View file

@ -417,6 +417,13 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
sampleKeyDto, _ := t.dtoSampleKeys.Get()
defer t.dtoSampleKeys.Give(sampleKeyDto)
defer func() {
// Give back all ops not yet popped.
for viewJob.builder.HasOp() {
giveBackOp(viewJob.builder.PopOp())
}
}()
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for viewJob.builder.HasOp() {
op := viewJob.builder.PopOp()

View file

@ -14,6 +14,7 @@
package metric
import (
"math"
"sort"
"testing"
"time"
@ -41,6 +42,20 @@ func buildSamples(from, to clientmodel.Timestamp, interval time.Duration, m clie
return
}
func buildValues(firstValue clientmodel.SampleValue, from, to clientmodel.Timestamp, interval time.Duration) (v Values) {
for from.Before(to) {
v = append(v, SamplePair{
Value: firstValue,
Timestamp: from,
})
from = from.Add(interval)
firstValue++
}
return
}
func testMakeView(t test.Tester, flushToDisk bool) {
type in struct {
atTime []getValuesAtTimeOp
@ -313,9 +328,14 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
},
// Two chunks of samples, query asks for values from first chunk.
// Two chunks of samples, query asks for values from second chunk.
{
data: buildSamples(instant, instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second), 2*time.Second, metric),
data: buildSamples(
instant,
instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second),
2*time.Second,
metric,
),
in: in{
atTime: []getValuesAtTimeOp{
{
@ -338,6 +358,103 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
},
// Two chunks of samples, query asks for values between both chunks.
{
data: buildSamples(
instant,
instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second),
2*time.Second,
metric,
),
in: in{
atTime: []getValuesAtTimeOp{
{
baseOp: baseOp{current: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2) - clientmodel.MinimumTick)},
},
},
},
out: out{
atTime: []Values{
{
{
Timestamp: instant.Add(time.Second * (time.Duration(*leveldbChunkSize*2) - 2)),
Value: 199,
},
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2)),
Value: 200,
},
},
},
},
},
// Two chunks of samples, getValuesAtIntervalOp spanning both.
{
data: buildSamples(
instant,
instant.Add(time.Duration(*leveldbChunkSize*6)*time.Second),
2*time.Second,
metric,
),
in: in{
atInterval: []getValuesAtIntervalOp{
{
getValuesAlongRangeOp: getValuesAlongRangeOp{
baseOp: baseOp{current: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2-4) - clientmodel.MinimumTick)},
through: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2+4) + clientmodel.MinimumTick),
},
interval: time.Second * 6,
},
},
},
out: out{
atInterval: []Values{
{
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2-6)),
Value: 197,
},
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2-4)),
Value: 198,
},
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2)),
Value: 200,
},
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2+2)),
Value: 201,
},
},
},
},
},
// Three chunks of samples, getValuesAlongRangeOp spanning all of them.
{
data: buildSamples(
instant,
instant.Add(time.Duration(*leveldbChunkSize*6)*time.Second),
2*time.Second,
metric,
),
in: in{
alongRange: []getValuesAlongRangeOp{
{
baseOp: baseOp{current: instant.Add(time.Second*time.Duration(*leveldbChunkSize*2-4) - clientmodel.MinimumTick)},
through: instant.Add(time.Second*time.Duration(*leveldbChunkSize*4+2) + clientmodel.MinimumTick),
},
},
},
out: out{
alongRange: []Values{buildValues(
clientmodel.SampleValue(198),
instant.Add(time.Second*time.Duration(*leveldbChunkSize*2-4)),
instant.Add(time.Second*time.Duration(*leveldbChunkSize*4+2)+clientmodel.MinimumTick),
2*time.Second,
)},
},
},
}
)
@ -373,14 +490,51 @@ func testMakeView(t test.Tester, flushToDisk bool) {
t.Fatalf("%d. failed due to %s", i, err)
}
for j, atTime := range scenario.in.atTime {
actual := v.GetValueAtTime(fingerprint, atTime.current)
// To get all values in the View, ask for the 'forever' interval.
interval := Interval{OldestInclusive: math.MinInt64, NewestInclusive: math.MaxInt64}
if len(actual) != len(scenario.out.atTime[j]) {
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(scenario.out.atTime[j]), len(actual))
for j, atTime := range scenario.out.atTime {
actual := v.GetRangeValues(fingerprint, interval)
if len(actual) != len(atTime) {
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(atTime), len(actual))
}
for k, value := range scenario.out.atTime[j] {
for k, value := range atTime {
if value.Value != actual[k].Value {
t.Errorf("%d.%d.%d expected %v value, got %v", i, j, k, value.Value, actual[k].Value)
}
if !value.Timestamp.Equal(actual[k].Timestamp) {
t.Errorf("%d.%d.%d expected %s timestamp, got %s", i, j, k, value.Timestamp, actual[k].Timestamp)
}
}
}
for j, atInterval := range scenario.out.atInterval {
actual := v.GetRangeValues(fingerprint, interval)
if len(actual) != len(atInterval) {
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(atInterval), len(actual))
}
for k, value := range atInterval {
if value.Value != actual[k].Value {
t.Errorf("%d.%d.%d expected %v value, got %v", i, j, k, value.Value, actual[k].Value)
}
if !value.Timestamp.Equal(actual[k].Timestamp) {
t.Errorf("%d.%d.%d expected %s timestamp, got %s", i, j, k, value.Timestamp, actual[k].Timestamp)
}
}
}
for j, alongRange := range scenario.out.alongRange {
actual := v.GetRangeValues(fingerprint, interval)
if len(actual) != len(alongRange) {
t.Fatalf("%d.%d. expected %d output, got %d", i, j, len(alongRange), len(actual))
}
for k, value := range alongRange {
if value.Value != actual[k].Value {
t.Fatalf("%d.%d.%d expected %v value, got %v", i, j, k, value.Value, actual[k].Value)
}
@ -571,7 +725,7 @@ func TestGetFingerprintsForLabelSet(t *testing.T) {
}
}
func testTruncateBefore(t test.Tester) {
func TestTruncateBefore(t *testing.T) {
type in struct {
values Values
time clientmodel.Timestamp
@ -726,10 +880,6 @@ func testTruncateBefore(t test.Tester) {
}
}
func TestTruncateBefore(t *testing.T) {
testTruncateBefore(t)
}
func TestGetMetricForFingerprintCachesCopyOfMetric(t *testing.T) {
ts, closer := NewTestTieredStorage(t)
defer closer.Close()