Merge pull request #1712 from fabric8io/kubernetes-sd-node-ip-labels

Kubernetes SD: Add labels for all node addresses and discover node port
This commit is contained in:
Brian Brazil 2016-06-07 12:37:40 +01:00
commit 33759dddd2
4 changed files with 50 additions and 14 deletions

View file

@ -129,7 +129,6 @@ var (
// DefaultKubernetesSDConfig is the default Kubernetes SD configuration // DefaultKubernetesSDConfig is the default Kubernetes SD configuration
DefaultKubernetesSDConfig = KubernetesSDConfig{ DefaultKubernetesSDConfig = KubernetesSDConfig{
KubeletPort: 10255,
RequestTimeout: model.Duration(10 * time.Second), RequestTimeout: model.Duration(10 * time.Second),
RetryInterval: model.Duration(1 * time.Second), RetryInterval: model.Duration(1 * time.Second),
} }
@ -752,7 +751,6 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
// KubernetesSDConfig is the configuration for Kubernetes service discovery. // KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct { type KubernetesSDConfig struct {
APIServers []URL `yaml:"api_servers"` APIServers []URL `yaml:"api_servers"`
KubeletPort int `yaml:"kubelet_port,omitempty"`
InCluster bool `yaml:"in_cluster,omitempty"` InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"`

View file

@ -227,7 +227,6 @@ var expectedConf = &Config{
Username: "myusername", Username: "myusername",
Password: "mypassword", Password: "mypassword",
}, },
KubeletPort: 10255,
RequestTimeout: model.Duration(10 * time.Second), RequestTimeout: model.Duration(10 * time.Second),
RetryInterval: model.Duration(1 * time.Second), RetryInterval: model.Duration(1 * time.Second),
}, },

View file

@ -83,6 +83,10 @@ const (
nodesTargetGroupName = "nodes" nodesTargetGroupName = "nodes"
// nodeLabelPrefix is the prefix for the node labels. // nodeLabelPrefix is the prefix for the node labels.
nodeLabelPrefix = metaLabelPrefix + "node_label_" nodeLabelPrefix = metaLabelPrefix + "node_label_"
// nodeAddressPrefix is the prefix for the node addresses.
nodeAddressPrefix = metaLabelPrefix + "node_address_"
// nodePortLabel is the name of the label for the node port.
nodePortLabel = metaLabelPrefix + "node_port"
// apiServersTargetGroupName is the name given to the target group for API servers. // apiServersTargetGroupName is the name given to the target group for API servers.
apiServersTargetGroupName = "apiServers" apiServersTargetGroupName = "apiServers"
@ -292,18 +296,28 @@ func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
// Now let's loop through the nodes & add them to the target group with appropriate labels. // Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range kd.nodes { for nodeName, node := range kd.nodes {
nodeAddress, err := nodeHostIP(node) defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node)
if err != nil { if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err) log.Debugf("Skipping node %s: %s", node.Name, err)
continue continue
} }
address := fmt.Sprintf("%s:%d", nodeAddress.String(), kd.Conf.KubeletPort) kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort)
t := model.LabelSet{ t := model.LabelSet{
model.AddressLabel: model.LabelValue(address), model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName), model.InstanceLabel: model.LabelValue(nodeName),
} }
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels { for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v) t[model.LabelName(labelName)] = model.LabelValue(v)
@ -778,28 +792,33 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) {
} }
} }
// nodeHostIP returns the provided node's address, based on the priority: // nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP // 1. NodeInternalIP
// 2. NodeExternalIP // 2. NodeExternalIP
// 3. NodeLegacyHostIP // 3. NodeLegacyHostIP
// //
// Copied from k8s.io/kubernetes/pkg/util/node/node.go // Copied from k8s.io/kubernetes/pkg/util/node/node.go
func nodeHostIP(node *Node) (net.IP, error) { func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
addresses := node.Status.Addresses addresses := node.Status.Addresses
addressMap := make(map[NodeAddressType][]NodeAddress) addressMap := map[NodeAddressType][]net.IP{}
for i := range addresses { for _, addr := range addresses {
addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
} }
if addresses, ok := addressMap[NodeInternalIP]; ok { if addresses, ok := addressMap[NodeInternalIP]; ok {
return net.ParseIP(addresses[0].Address), nil return addresses[0], addressMap, nil
} }
if addresses, ok := addressMap[NodeExternalIP]; ok { if addresses, ok := addressMap[NodeExternalIP]; ok {
return net.ParseIP(addresses[0].Address), nil return addresses[0], addressMap, nil
} }
if addresses, ok := addressMap[NodeLegacyHostIP]; ok { if addresses, ok := addressMap[NodeLegacyHostIP]; ok {
return net.ParseIP(addresses[0].Address), nil return addresses[0], addressMap, nil
} }
return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
} }
func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { func (kd *Discovery) updatePod(pod *Pod, eventType EventType) {

View file

@ -208,6 +208,26 @@ type EndpointsList struct {
type NodeStatus struct { type NodeStatus struct {
// Queried from cloud provider, if available. // Queried from cloud provider, if available.
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"` Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"`
// Endpoints of daemons running on the Node.
DaemonEndpoints NodeDaemonEndpoints `json:"daemonEndpoints,omitempty"`
}
// NodeDaemonEndpoints lists ports opened by daemons running on the Node.
type NodeDaemonEndpoints struct {
// Endpoint on which Kubelet is listening.
KubeletEndpoint DaemonEndpoint `json:"kubeletEndpoint,omitempty"`
}
// DaemonEndpoint contains information about a single Daemon endpoint.
type DaemonEndpoint struct {
/*
The port tag was not properly in quotes in earlier releases, so it must be
uppercased for backwards compat (since it was falling back to var name of
'Port').
*/
// Port number of the given endpoint.
Port int32 `json:"Port"`
} }
// NodeAddressType can legally only have the values defined as constants below. // NodeAddressType can legally only have the values defined as constants below.