Create `TargetPool` priority queue.

``TargetPool`` is a pool of targets pending scraping.  For now, it
uses the ``heap.Interface`` from ``container/heap`` to provide a
priority queue for the system to scrape from the next target.

It is my supposition that we'll use a model whereby we create a
``TargetPool`` for each scrape interval, into which ``Target``
instances are registered.
This commit is contained in:
Matt T. Proud 2013-01-04 12:17:31 +01:00
parent 3ac5d48b1a
commit 7a9777b4b5
3 changed files with 186 additions and 8 deletions

View file

@ -1,11 +1,19 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval package retrieval
import ( import (
"encoding/json"
"github.com/matttproud/prometheus/model"
"io/ioutil"
"net/http"
"strconv"
"time" "time"
) )
@ -18,16 +26,20 @@ const (
) )
type Target struct { type Target struct {
State TargetState scheduledFor time.Time
unreachableCount int
state TargetState
Address string Address string
Staleness time.Duration
Frequency time.Duration Frequency time.Duration
} }
// KEPT FOR LEGACY COMPATIBILITY; PENDING REFACTOR
func (t *Target) Scrape() (samples []model.Sample, err error) { func (t *Target) Scrape() (samples []model.Sample, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
t.State = ALIVE t.state = ALIVE
} }
}() }()

38
retrieval/targetpool.go Normal file
View file

@ -0,0 +1,38 @@
package retrieval
import (
"encoding/json"
"github.com/matttproud/prometheus/model"
"io/ioutil"
"net/http"
"strconv"
"time"
)
type TargetPool []*Target
func (p TargetPool) Len() int {
return len(p)
}
func (p TargetPool) Less(i, j int) bool {
return p[i].scheduledFor.Before(p[j].scheduledFor)
}
func (p *TargetPool) Pop() interface{} {
oldPool := *p
futureLength := p.Len() - 1
element := oldPool[futureLength]
futurePool := oldPool[0:futureLength]
*p = futurePool
return element
}
func (p *TargetPool) Push(element interface{}) {
*p = append(*p, element.(*Target))
}
func (p TargetPool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}

View file

@ -0,0 +1,128 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"container/heap"
"testing"
"time"
)
func TestTargetPool(t *testing.T) {
type expectation struct {
size int
}
type input struct {
address string
scheduledFor time.Time
}
type output struct {
address string
}
var scenarios = []struct {
name string
outputs []output
inputs []input
}{
{
name: "empty",
inputs: []input{},
outputs: []output{},
},
{
name: "single element",
inputs: []input{
{
address: "http://single.com",
},
},
outputs: []output{
{
address: "http://single.com",
},
},
},
{
name: "plural descending schedules",
inputs: []input{
{
address: "http://plural-descending.com",
scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC),
},
{
address: "http://plural-descending.net",
scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC),
},
},
outputs: []output{
{
address: "http://plural-descending.net",
},
{
address: "http://plural-descending.com",
},
},
},
{
name: "plural ascending schedules",
inputs: []input{
{
address: "http://plural-ascending.net",
scheduledFor: time.Date(2013, 1, 4, 11, 0, 0, 0, time.UTC),
},
{
address: "http://plural-ascending.com",
scheduledFor: time.Date(2013, 1, 4, 12, 0, 0, 0, time.UTC),
},
},
outputs: []output{
{
address: "http://plural-ascending.net",
},
{
address: "http://plural-ascending.com",
},
},
},
}
for i, scenario := range scenarios {
pool := TargetPool{}
for _, input := range scenario.inputs {
target := Target{
Address: input.address,
scheduledFor: input.scheduledFor,
}
heap.Push(&pool, &target)
}
if pool.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
} else {
for j, output := range scenario.outputs {
target := heap.Pop(&pool).(*Target)
if target.Address != output.address {
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address)
}
}
}
}
}