Merge pull request #139 from prometheus/julius-reduce-sample-jitter

Fix scrape timestamps to reduce sample time jitter.
This commit is contained in:
Matt T. Proud 2013-04-13 06:57:07 -07:00
commit e6f67c3a3e
4 changed files with 11 additions and 8 deletions

View file

@ -16,6 +16,7 @@ package format
import (
"github.com/prometheus/prometheus/model"
"io"
"time"
)
// Processor is responsible for decoding the actual message responses from
@ -23,5 +24,5 @@ import (
// to the results channel.
type Processor interface {
// Process performs the work on the input and closes the incoming stream.
Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error)
Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error)
}

View file

@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/utility"
"io"
"io/ioutil"
"time"
)
const (
@ -57,7 +58,7 @@ type entity001 []struct {
} `json:"metric"`
}
func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) {
func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
defer stream.Close()
@ -73,8 +74,6 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
return
}
now := p.time.Now()
// TODO(matt): This outer loop is a great basis for parallelization.
for _, entity := range entities {
for _, value := range entity.Metric.Value {
@ -101,7 +100,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
sample := model.Sample{
Metric: metric,
Timestamp: now,
Timestamp: timestamp,
Value: model.SampleValue(sampleValue),
}
@ -136,7 +135,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
sample := model.Sample{
Metric: childMetric,
Timestamp: now,
Timestamp: timestamp,
Value: model.SampleValue(individualValue),
}

View file

@ -21,6 +21,7 @@ import (
"io/ioutil"
"strings"
"testing"
"time"
)
func testProcessor001Process(t test.Tester) {
@ -172,7 +173,7 @@ func testProcessor001Process(t test.Tester) {
reader := strings.NewReader(scenario.in)
err := Processor001.Process(ioutil.NopCloser(reader), model.LabelSet{}, inputChannel)
err := Processor001.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel)
if !test.ErrorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue

View file

@ -162,6 +162,8 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
done <- true
}()
now := time.Now()
var resp *http.Response // Don't shadow "err" from the enclosing function.
resp, err = http.Get(t.Address())
if err != nil {
@ -182,7 +184,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
baseLabels[baseLabel] = baseValue
}
err = processor.Process(resp.Body, baseLabels, results)
err = processor.Process(resp.Body, now, baseLabels, results)
if err != nil {
return
}