mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Index pods by node name
Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
This commit is contained in:
parent
eb5512555d
commit
16bd0d7d5c
|
@ -511,7 +511,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
}
|
||||
pod := NewPod(
|
||||
log.With(d.logger, "role", "pod"),
|
||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
||||
d.newPodsByNodeInformer(plw),
|
||||
nodeInformer,
|
||||
)
|
||||
d.discoverers = append(d.discoverers, pod)
|
||||
|
@ -679,3 +679,18 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
|
|||
}
|
||||
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod)
|
||||
}
|
||||
|
||||
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
indexers := make(map[string]cache.IndexFunc)
|
||||
if d.attachMetadata.Node {
|
||||
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
|
||||
pod, ok := obj.(*apiv1.Pod)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("object is not a pod")
|
||||
}
|
||||
return []string{pod.Spec.NodeName}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncPeriod, indexers)
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import (
|
|||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
const nodeIndex = "node"
|
||||
|
||||
var (
|
||||
podAddCount = eventCount.WithLabelValues("pod", "add")
|
||||
podUpdateCount = eventCount.WithLabelValues("pod", "update")
|
||||
|
@ -40,7 +42,7 @@ var (
|
|||
|
||||
// Pod discovers new pod targets.
|
||||
type Pod struct {
|
||||
podInf cache.SharedInformer
|
||||
podInf cache.SharedIndexInformer
|
||||
nodeInf cache.SharedInformer
|
||||
withNodeMetadata bool
|
||||
store cache.Store
|
||||
|
@ -49,10 +51,11 @@ type Pod struct {
|
|||
}
|
||||
|
||||
// NewPod creates a new pod discovery.
|
||||
func NewPod(l log.Logger, pods, nodes cache.SharedInformer) *Pod {
|
||||
func NewPod(l log.Logger, pods cache.SharedIndexInformer, nodes cache.SharedInformer) *Pod {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
|
||||
p := &Pod{
|
||||
podInf: pods,
|
||||
nodeInf: nodes,
|
||||
|
@ -309,11 +312,14 @@ func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) {
|
|||
}
|
||||
|
||||
func (p *Pod) enqueuePodsForNode(nodeName string) {
|
||||
for _, pod := range p.store.List() {
|
||||
pod := pod.(*apiv1.Pod)
|
||||
if pod.Spec.NodeName == nodeName {
|
||||
p.enqueue(pod)
|
||||
}
|
||||
pods, err := p.podInf.GetIndexer().ByIndex(nodeIndex, nodeName)
|
||||
if err != nil {
|
||||
level.Error(p.logger).Log("msg", "Error getting pods for node", "node", nodeName, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
p.enqueue(pod.(*apiv1.Pod))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue