mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Use the `TargetManager
` for targets.
This commit is contained in:
parent
45a3e0a182
commit
2922def8d0
22
main.go
22
main.go
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/matttproud/prometheus/storage/metric/leveldb"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -32,21 +33,22 @@ func main() {
|
|||
m.Close()
|
||||
}()
|
||||
|
||||
results := make(chan retrieval.Result, 4096)
|
||||
|
||||
t := &retrieval.Target{
|
||||
Address: "http://localhost:8080/metrics.json",
|
||||
Address: "http://localhost:8080/metrics.json",
|
||||
Deadline: time.Second * 5,
|
||||
Interval: time.Second * 3,
|
||||
}
|
||||
|
||||
for i := 0; i < 100000; i++ {
|
||||
c, err := t.Scrape()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
continue
|
||||
}
|
||||
manager := retrieval.NewTargetManager(results, 1)
|
||||
manager.Add(t)
|
||||
|
||||
for _, s := range c {
|
||||
for {
|
||||
result := <-results
|
||||
fmt.Printf("result -> %s\n", result)
|
||||
for _, s := range result.Samples {
|
||||
m.AppendSample(&s)
|
||||
}
|
||||
|
||||
fmt.Printf("Finished %d\n", i)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,10 +10,17 @@
|
|||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matttproud/prometheus/model"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -25,96 +32,126 @@ const (
|
|||
UNREACHABLE
|
||||
)
|
||||
|
||||
const (
|
||||
MAXIMUM_BACKOFF = time.Minute * 30
|
||||
)
|
||||
|
||||
type Target struct {
|
||||
scheduledFor time.Time
|
||||
unreachableCount int
|
||||
state TargetState
|
||||
|
||||
Address string
|
||||
Deadline time.Duration
|
||||
|
||||
// XXX: Move this to a field with the target manager initialization instead of here.
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
// KEPT FOR LEGACY COMPATIBILITY; PENDING REFACTOR
|
||||
type Result struct {
|
||||
Err error
|
||||
Samples []model.Sample
|
||||
Target Target
|
||||
}
|
||||
|
||||
func (t *Target) Scrape() (samples []model.Sample, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
t.state = ALIVE
|
||||
func (t *Target) reschedule(s TargetState) {
|
||||
currentState := t.state
|
||||
|
||||
switch currentState {
|
||||
case UNKNOWN, UNREACHABLE:
|
||||
switch s {
|
||||
case ALIVE:
|
||||
t.unreachableCount = 0
|
||||
case UNREACHABLE:
|
||||
backoff := MAXIMUM_BACKOFF
|
||||
exponential := time.Duration(math.Pow(2, float64(t.unreachableCount))) * time.Second
|
||||
if backoff > exponential {
|
||||
backoff = exponential
|
||||
}
|
||||
|
||||
t.scheduledFor = time.Now().Add(backoff)
|
||||
t.unreachableCount++
|
||||
|
||||
log.Printf("%s unavailable %s times deferred for %s.", t, t.unreachableCount, backoff)
|
||||
default:
|
||||
}
|
||||
case ALIVE:
|
||||
switch s {
|
||||
case UNREACHABLE:
|
||||
t.unreachableCount++
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
if s != currentState {
|
||||
log.Printf("%s transitioning from %s to %s.", t, currentState, s)
|
||||
}
|
||||
|
||||
t.state = s
|
||||
}
|
||||
|
||||
func (t *Target) Scrape(results chan Result) (err error) {
|
||||
result := Result{}
|
||||
|
||||
defer func() {
|
||||
futureState := t.state
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
futureState = ALIVE
|
||||
default:
|
||||
futureState = UNREACHABLE
|
||||
}
|
||||
|
||||
t.reschedule(futureState)
|
||||
|
||||
result.Err = err
|
||||
results <- result
|
||||
}()
|
||||
|
||||
ti := time.Now()
|
||||
resp, err := http.Get(t.Address)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
done := make(chan bool)
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
raw, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
intermediate := make(map[string]interface{})
|
||||
err = json.Unmarshal(raw, &intermediate)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
baseLabels := map[string]string{"instance": t.Address}
|
||||
|
||||
for name, v := range intermediate {
|
||||
asMap, ok := v.(map[string]interface{})
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
go func() {
|
||||
ti := time.Now()
|
||||
resp, err := http.Get(t.Address)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch asMap["type"] {
|
||||
case "counter":
|
||||
m := model.Metric{}
|
||||
m["name"] = model.LabelValue(name)
|
||||
asFloat, ok := asMap["value"].(float64)
|
||||
defer resp.Body.Close()
|
||||
|
||||
raw, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
intermediate := make(map[string]interface{})
|
||||
err = json.Unmarshal(raw, &intermediate)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
baseLabels := map[string]string{"instance": t.Address}
|
||||
|
||||
for name, v := range intermediate {
|
||||
asMap, ok := v.(map[string]interface{})
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
s := model.Sample{
|
||||
Metric: m,
|
||||
Value: model.SampleValue(asFloat),
|
||||
Timestamp: ti,
|
||||
}
|
||||
|
||||
for baseK, baseV := range baseLabels {
|
||||
m[model.LabelName(baseK)] = model.LabelValue(baseV)
|
||||
}
|
||||
|
||||
samples = append(samples, s)
|
||||
case "histogram":
|
||||
values, ok := asMap["value"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for p, pValue := range values {
|
||||
asString, ok := pValue.(string)
|
||||
switch asMap["type"] {
|
||||
case "counter":
|
||||
m := model.Metric{}
|
||||
m["name"] = model.LabelValue(name)
|
||||
asFloat, ok := asMap["value"].(float64)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
float, err := strconv.ParseFloat(asString, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
m := model.Metric{}
|
||||
m["name"] = model.LabelValue(name)
|
||||
m["percentile"] = model.LabelValue(p)
|
||||
|
||||
s := model.Sample{
|
||||
Metric: m,
|
||||
Value: model.SampleValue(float),
|
||||
Value: model.SampleValue(asFloat),
|
||||
Timestamp: ti,
|
||||
}
|
||||
|
||||
|
@ -122,9 +159,51 @@ func (t *Target) Scrape() (samples []model.Sample, err error) {
|
|||
m[model.LabelName(baseK)] = model.LabelValue(baseV)
|
||||
}
|
||||
|
||||
samples = append(samples, s)
|
||||
result.Samples = append(result.Samples, s)
|
||||
case "histogram":
|
||||
values, ok := asMap["value"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for p, pValue := range values {
|
||||
asString, ok := pValue.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
float, err := strconv.ParseFloat(asString, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
m := model.Metric{}
|
||||
m["name"] = model.LabelValue(name)
|
||||
m["percentile"] = model.LabelValue(p)
|
||||
|
||||
s := model.Sample{
|
||||
Metric: m,
|
||||
Value: model.SampleValue(float),
|
||||
Timestamp: ti,
|
||||
}
|
||||
|
||||
for baseK, baseV := range baseLabels {
|
||||
m[model.LabelName(baseK)] = model.LabelValue(baseV)
|
||||
}
|
||||
|
||||
result.Samples = append(result.Samples, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
done <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
break
|
||||
case <-time.After(t.Deadline):
|
||||
err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
66
retrieval/targetmanager.go
Normal file
66
retrieval/targetmanager.go
Normal file
|
@ -0,0 +1,66 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TargetManager interface {
|
||||
acquire()
|
||||
release()
|
||||
Add(t *Target)
|
||||
Remove(t *Target)
|
||||
}
|
||||
|
||||
type targetManager struct {
|
||||
requestAllowance chan bool
|
||||
pools map[time.Duration]TargetPool
|
||||
results chan Result
|
||||
}
|
||||
|
||||
func NewTargetManager(results chan Result, requestAllowance int) TargetManager {
|
||||
return targetManager{
|
||||
requestAllowance: make(chan bool, requestAllowance),
|
||||
results: results,
|
||||
pools: make(map[time.Duration]TargetPool),
|
||||
}
|
||||
}
|
||||
|
||||
func (m targetManager) acquire() {
|
||||
m.requestAllowance <- true
|
||||
}
|
||||
|
||||
func (m targetManager) release() {
|
||||
<-m.requestAllowance
|
||||
}
|
||||
|
||||
func (m targetManager) Add(t *Target) {
|
||||
targetPool, ok := m.pools[t.Interval]
|
||||
|
||||
if !ok {
|
||||
targetPool.manager = m
|
||||
log.Printf("Pool %s does not exist; creating and starting...", t.Interval)
|
||||
go targetPool.Run(m.results, t.Interval)
|
||||
}
|
||||
|
||||
heap.Push(&targetPool, t)
|
||||
m.pools[t.Interval] = targetPool
|
||||
}
|
||||
|
||||
func (m targetManager) Remove(t *Target) {
|
||||
panic("not implemented")
|
||||
}
|
|
@ -1,38 +1,92 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/matttproud/prometheus/model"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"container/heap"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TargetPool []*Target
|
||||
type TargetPool struct {
|
||||
done chan bool
|
||||
targets []*Target
|
||||
manager TargetManager
|
||||
}
|
||||
|
||||
func NewTargetPool(m TargetManager) (p TargetPool) {
|
||||
p.manager = m
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (p TargetPool) Len() int {
|
||||
return len(p)
|
||||
return len(p.targets)
|
||||
}
|
||||
|
||||
func (p TargetPool) Less(i, j int) bool {
|
||||
return p[i].scheduledFor.Before(p[j].scheduledFor)
|
||||
return p.targets[i].scheduledFor.Before(p.targets[j].scheduledFor)
|
||||
}
|
||||
|
||||
func (p *TargetPool) Pop() interface{} {
|
||||
oldPool := *p
|
||||
oldPool := p.targets
|
||||
futureLength := p.Len() - 1
|
||||
element := oldPool[futureLength]
|
||||
futurePool := oldPool[0:futureLength]
|
||||
*p = futurePool
|
||||
p.targets = futurePool
|
||||
|
||||
return element
|
||||
}
|
||||
|
||||
func (p *TargetPool) Push(element interface{}) {
|
||||
*p = append(*p, element.(*Target))
|
||||
p.targets = append(p.targets, element.(*Target))
|
||||
}
|
||||
|
||||
func (p TargetPool) Swap(i, j int) {
|
||||
p[i], p[j] = p[j], p[i]
|
||||
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
|
||||
}
|
||||
|
||||
func (p *TargetPool) Run(results chan Result, interval time.Duration) {
|
||||
ticker := time.Tick(interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker:
|
||||
p.runIteration(results)
|
||||
case <-p.done:
|
||||
log.Printf("TargetPool exiting...")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p TargetPool) Stop() {
|
||||
p.done <- true
|
||||
}
|
||||
|
||||
func (p *TargetPool) runSingle(results chan Result, t *Target) {
|
||||
p.manager.acquire()
|
||||
defer p.manager.release()
|
||||
|
||||
t.Scrape(results)
|
||||
}
|
||||
|
||||
func (p *TargetPool) runIteration(results chan Result) {
|
||||
for i := 0; i < p.Len(); i++ {
|
||||
target := heap.Pop(p).(*Target)
|
||||
if target == nil {
|
||||
break
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
if target.scheduledFor.After(now) {
|
||||
heap.Push(p, target)
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
go func() {
|
||||
p.runSingle(results, target)
|
||||
heap.Push(p, target)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue