Merge pull request #5 from matttproud/feature/scraping-infrastructure

Full End-to-End Scraping Infrastructure
This commit is contained in:
juliusv 2013-01-04 08:36:36 -08:00
commit 8f010c9801
4 changed files with 289 additions and 88 deletions

22
main.go
View file

@ -19,6 +19,7 @@ import (
"github.com/matttproud/prometheus/storage/metric/leveldb" "github.com/matttproud/prometheus/storage/metric/leveldb"
"log" "log"
"os" "os"
"time"
) )
func main() { func main() {
@ -32,21 +33,22 @@ func main() {
m.Close() m.Close()
}() }()
results := make(chan retrieval.Result, 4096)
t := &retrieval.Target{ t := &retrieval.Target{
Address: "http://localhost:8080/metrics.json", Address: "http://localhost:8080/metrics.json",
Deadline: time.Second * 5,
Interval: time.Second * 3,
} }
for i := 0; i < 100000; i++ { manager := retrieval.NewTargetManager(results, 1)
c, err := t.Scrape() manager.Add(t)
if err != nil {
fmt.Println(err)
continue
}
for _, s := range c { for {
result := <-results
fmt.Printf("result -> %s\n", result)
for _, s := range result.Samples {
m.AppendSample(&s) m.AppendSample(&s)
} }
fmt.Printf("Finished %d\n", i)
} }
} }

View file

@ -10,10 +10,17 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package retrieval package retrieval
import ( import (
"encoding/json"
"fmt"
"github.com/matttproud/prometheus/model"
"io/ioutil"
"log"
"math"
"net/http"
"strconv"
"time" "time"
) )
@ -25,96 +32,126 @@ const (
UNREACHABLE UNREACHABLE
) )
const (
MAXIMUM_BACKOFF = time.Minute * 30
)
type Target struct { type Target struct {
scheduledFor time.Time scheduledFor time.Time
unreachableCount int unreachableCount int
state TargetState state TargetState
Address string Address string
Deadline time.Duration
// XXX: Move this to a field with the target manager initialization instead of here.
Interval time.Duration Interval time.Duration
} }
// KEPT FOR LEGACY COMPATIBILITY; PENDING REFACTOR type Result struct {
Err error
Samples []model.Sample
Target Target
}
func (t *Target) Scrape() (samples []model.Sample, err error) { func (t *Target) reschedule(s TargetState) {
defer func() { currentState := t.state
if err != nil {
t.state = ALIVE switch currentState {
case UNKNOWN, UNREACHABLE:
switch s {
case ALIVE:
t.unreachableCount = 0
case UNREACHABLE:
backoff := MAXIMUM_BACKOFF
exponential := time.Duration(math.Pow(2, float64(t.unreachableCount))) * time.Second
if backoff > exponential {
backoff = exponential
}
t.scheduledFor = time.Now().Add(backoff)
t.unreachableCount++
log.Printf("%s unavailable %s times deferred for %s.", t, t.unreachableCount, backoff)
default:
} }
case ALIVE:
switch s {
case UNREACHABLE:
t.unreachableCount++
}
default:
}
if s != currentState {
log.Printf("%s transitioning from %s to %s.", t, currentState, s)
}
t.state = s
}
func (t *Target) Scrape(results chan Result) (err error) {
result := Result{}
defer func() {
futureState := t.state
switch err {
case nil:
futureState = ALIVE
default:
futureState = UNREACHABLE
}
t.reschedule(futureState)
result.Err = err
results <- result
}() }()
ti := time.Now() done := make(chan bool)
resp, err := http.Get(t.Address)
if err != nil {
return
}
defer resp.Body.Close() go func() {
ti := time.Now()
raw, err := ioutil.ReadAll(resp.Body) resp, err := http.Get(t.Address)
if err != nil { if err != nil {
return return
}
intermediate := make(map[string]interface{})
err = json.Unmarshal(raw, &intermediate)
if err != nil {
return
}
baseLabels := map[string]string{"instance": t.Address}
for name, v := range intermediate {
asMap, ok := v.(map[string]interface{})
if !ok {
continue
} }
switch asMap["type"] { defer resp.Body.Close()
case "counter":
m := model.Metric{} raw, err := ioutil.ReadAll(resp.Body)
m["name"] = model.LabelValue(name) if err != nil {
asFloat, ok := asMap["value"].(float64) return
}
intermediate := make(map[string]interface{})
err = json.Unmarshal(raw, &intermediate)
if err != nil {
return
}
baseLabels := map[string]string{"instance": t.Address}
for name, v := range intermediate {
asMap, ok := v.(map[string]interface{})
if !ok { if !ok {
continue continue
} }
s := model.Sample{ switch asMap["type"] {
Metric: m, case "counter":
Value: model.SampleValue(asFloat), m := model.Metric{}
Timestamp: ti, m["name"] = model.LabelValue(name)
} asFloat, ok := asMap["value"].(float64)
for baseK, baseV := range baseLabels {
m[model.LabelName(baseK)] = model.LabelValue(baseV)
}
samples = append(samples, s)
case "histogram":
values, ok := asMap["value"].(map[string]interface{})
if !ok {
continue
}
for p, pValue := range values {
asString, ok := pValue.(string)
if !ok { if !ok {
continue continue
} }
float, err := strconv.ParseFloat(asString, 64)
if err != nil {
continue
}
m := model.Metric{}
m["name"] = model.LabelValue(name)
m["percentile"] = model.LabelValue(p)
s := model.Sample{ s := model.Sample{
Metric: m, Metric: m,
Value: model.SampleValue(float), Value: model.SampleValue(asFloat),
Timestamp: ti, Timestamp: ti,
} }
@ -122,9 +159,51 @@ func (t *Target) Scrape() (samples []model.Sample, err error) {
m[model.LabelName(baseK)] = model.LabelValue(baseV) m[model.LabelName(baseK)] = model.LabelValue(baseV)
} }
samples = append(samples, s) result.Samples = append(result.Samples, s)
case "histogram":
values, ok := asMap["value"].(map[string]interface{})
if !ok {
continue
}
for p, pValue := range values {
asString, ok := pValue.(string)
if !ok {
continue
}
float, err := strconv.ParseFloat(asString, 64)
if err != nil {
continue
}
m := model.Metric{}
m["name"] = model.LabelValue(name)
m["percentile"] = model.LabelValue(p)
s := model.Sample{
Metric: m,
Value: model.SampleValue(float),
Timestamp: ti,
}
for baseK, baseV := range baseLabels {
m[model.LabelName(baseK)] = model.LabelValue(baseV)
}
result.Samples = append(result.Samples, s)
}
} }
} }
done <- true
}()
select {
case <-done:
break
case <-time.After(t.Deadline):
err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline)
} }
return return

View file

@ -0,0 +1,66 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"container/heap"
"log"
"time"
)
type TargetManager interface {
acquire()
release()
Add(t *Target)
Remove(t *Target)
}
type targetManager struct {
requestAllowance chan bool
pools map[time.Duration]TargetPool
results chan Result
}
func NewTargetManager(results chan Result, requestAllowance int) TargetManager {
return targetManager{
requestAllowance: make(chan bool, requestAllowance),
results: results,
pools: make(map[time.Duration]TargetPool),
}
}
func (m targetManager) acquire() {
m.requestAllowance <- true
}
func (m targetManager) release() {
<-m.requestAllowance
}
func (m targetManager) Add(t *Target) {
targetPool, ok := m.pools[t.Interval]
if !ok {
targetPool.manager = m
log.Printf("Pool %s does not exist; creating and starting...", t.Interval)
go targetPool.Run(m.results, t.Interval)
}
heap.Push(&targetPool, t)
m.pools[t.Interval] = targetPool
}
func (m targetManager) Remove(t *Target) {
panic("not implemented")
}

View file

@ -1,38 +1,92 @@
package retrieval package retrieval
import ( import (
"encoding/json" "container/heap"
"github.com/matttproud/prometheus/model" "log"
"io/ioutil"
"net/http"
"strconv"
"time" "time"
) )
type TargetPool []*Target type TargetPool struct {
done chan bool
targets []*Target
manager TargetManager
}
func NewTargetPool(m TargetManager) (p TargetPool) {
p.manager = m
return
}
func (p TargetPool) Len() int { func (p TargetPool) Len() int {
return len(p) return len(p.targets)
} }
func (p TargetPool) Less(i, j int) bool { func (p TargetPool) Less(i, j int) bool {
return p[i].scheduledFor.Before(p[j].scheduledFor) return p.targets[i].scheduledFor.Before(p.targets[j].scheduledFor)
} }
func (p *TargetPool) Pop() interface{} { func (p *TargetPool) Pop() interface{} {
oldPool := *p oldPool := p.targets
futureLength := p.Len() - 1 futureLength := p.Len() - 1
element := oldPool[futureLength] element := oldPool[futureLength]
futurePool := oldPool[0:futureLength] futurePool := oldPool[0:futureLength]
*p = futurePool p.targets = futurePool
return element return element
} }
func (p *TargetPool) Push(element interface{}) { func (p *TargetPool) Push(element interface{}) {
*p = append(*p, element.(*Target)) p.targets = append(p.targets, element.(*Target))
} }
func (p TargetPool) Swap(i, j int) { func (p TargetPool) Swap(i, j int) {
p[i], p[j] = p[j], p[i] p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan Result, interval time.Duration) {
ticker := time.Tick(interval)
for {
select {
case <-ticker:
p.runIteration(results)
case <-p.done:
log.Printf("TargetPool exiting...")
break
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(results chan Result, t *Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(results)
}
func (p *TargetPool) runIteration(results chan Result) {
for i := 0; i < p.Len(); i++ {
target := heap.Pop(p).(*Target)
if target == nil {
break
}
now := time.Now()
if target.scheduledFor.After(now) {
heap.Push(p, target)
break
}
go func() {
p.runSingle(results, target)
heap.Push(p, target)
}()
}
} }