// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discovery

import (
	"encoding/json"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/prometheus/common/model"
	"github.com/samuel/go-zookeeper/zk"

	"github.com/prometheus/prometheus/config"
	"github.com/prometheus/prometheus/util/strutil"
	"github.com/prometheus/prometheus/util/treecache"
)

const (
	serversetNodePrefix = "member_"

	serversetLabelPrefix         = model.MetaLabelPrefix + "serverset_"
	serversetStatusLabel         = serversetLabelPrefix + "status"
	serversetPathLabel           = serversetLabelPrefix + "path"
	serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint"
	serversetShardLabel          = serversetLabelPrefix + "shard"
)

type serversetMember struct {
	ServiceEndpoint     serversetEndpoint
	AdditionalEndpoints map[string]serversetEndpoint
	Status              string `json:"status"`
	Shard               int    `json:"shard"`
}

type serversetEndpoint struct {
	Host string
	Port int
}

// ServersetDiscovery retrieves target information from a Serverset server
// and updates them via watches.
type ServersetDiscovery struct {
	conf       *config.ServersetSDConfig
	conn       *zk.Conn
	mu         sync.RWMutex
	sources    map[string]*config.TargetGroup
	sdUpdates  *chan<- config.TargetGroup
	updates    chan treecache.ZookeeperTreeCacheEvent
	treeCaches []*treecache.ZookeeperTreeCache
}

// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
	conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout))
	conn.SetLogger(treecache.ZookeeperLogger{})
	if err != nil {
		return nil
	}
	updates := make(chan treecache.ZookeeperTreeCacheEvent)
	sd := &ServersetDiscovery{
		conf:    conf,
		conn:    conn,
		updates: updates,
		sources: map[string]*config.TargetGroup{},
	}
	go sd.processUpdates()
	for _, path := range conf.Paths {
		sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates))
	}
	return sd
}

// Sources implements the TargetProvider interface.
func (sd *ServersetDiscovery) Sources() []string {
	sd.mu.RLock()
	defer sd.mu.RUnlock()
	srcs := []string{}
	for t := range sd.sources {
		srcs = append(srcs, t)
	}
	return srcs
}

func (sd *ServersetDiscovery) processUpdates() {
	defer sd.conn.Close()
	for event := range sd.updates {
		tg := &config.TargetGroup{
			Source: event.Path,
		}
		sd.mu.Lock()
		if event.Data != nil {
			labelSet, err := parseServersetMember(*event.Data, event.Path)
			if err == nil {
				tg.Targets = []model.LabelSet{*labelSet}
				sd.sources[event.Path] = tg
			} else {
				delete(sd.sources, event.Path)
			}
		} else {
			delete(sd.sources, event.Path)
		}
		sd.mu.Unlock()
		if sd.sdUpdates != nil {
			*sd.sdUpdates <- *tg
		}
	}

	if sd.sdUpdates != nil {
		close(*sd.sdUpdates)
	}
}

// Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
	// Send on everything we have seen so far.
	sd.mu.Lock()
	for _, targetGroup := range sd.sources {
		ch <- *targetGroup
	}
	// Tell processUpdates to send future updates.
	sd.sdUpdates = &ch
	sd.mu.Unlock()

	<-done
	for _, tc := range sd.treeCaches {
		tc.Stop()
	}
}

func parseServersetMember(data []byte, path string) (*model.LabelSet, error) {
	member := serversetMember{}
	err := json.Unmarshal(data, &member)
	if err != nil {
		return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err)
	}

	labels := model.LabelSet{}
	labels[serversetPathLabel] = model.LabelValue(path)
	labels[model.AddressLabel] = model.LabelValue(
		fmt.Sprintf("%s:%d", member.ServiceEndpoint.Host, member.ServiceEndpoint.Port))

	labels[serversetEndpointLabelPrefix+"_host"] = model.LabelValue(member.ServiceEndpoint.Host)
	labels[serversetEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port))

	for name, endpoint := range member.AdditionalEndpoints {
		cleanName := model.LabelName(strutil.SanitizeLabelName(name))
		labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = model.LabelValue(
			endpoint.Host)
		labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = model.LabelValue(
			fmt.Sprintf("%d", endpoint.Port))

	}

	labels[serversetStatusLabel] = model.LabelValue(member.Status)
	labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard))

	return &labels, nil
}