// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retrieval

import (
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"math/rand"
	"net/http"
	"net/url"
	"strings"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/common/expfmt"
	"github.com/prometheus/common/log"
	"github.com/prometheus/common/model"

	"github.com/prometheus/prometheus/config"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/util/httputil"
)

const (
	scrapeHealthMetricName   = "up"
	scrapeDurationMetricName = "scrape_duration_seconds"

	// Capacity of the channel to buffer samples during ingestion.
	ingestedSamplesCap = 256

	// Constants for instrumentation.
	namespace = "prometheus"
	interval  = "interval"
)

var (
	errIngestChannelFull = errors.New("ingestion channel full")

	targetIntervalLength = prometheus.NewSummaryVec(
		prometheus.SummaryOpts{
			Namespace:  namespace,
			Name:       "target_interval_length_seconds",
			Help:       "Actual intervals between scrapes.",
			Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
		},
		[]string{interval},
	)
)

func init() {
	prometheus.MustRegister(targetIntervalLength)
}

// TargetHealth describes the health state of a target.
type TargetHealth int

func (t TargetHealth) String() string {
	switch t {
	case HealthUnknown:
		return "unknown"
	case HealthGood:
		return "healthy"
	case HealthBad:
		return "unhealthy"
	}
	panic("unknown state")
}

func (t TargetHealth) value() model.SampleValue {
	if t == HealthGood {
		return 1
	}
	return 0
}

const (
	// HealthUnknown is the state of a Target before it is first scraped.
	HealthUnknown TargetHealth = iota
	// HealthGood is the state of a Target that has been successfully scraped.
	HealthGood
	// HealthBad is the state of a Target that was scraped unsuccessfully.
	HealthBad
)

// TargetStatus contains information about the current status of a scrape target.
type TargetStatus struct {
	lastError  error
	lastScrape time.Time
	health     TargetHealth

	mu sync.RWMutex
}

// LastError returns the error encountered during the last scrape.
func (ts *TargetStatus) LastError() error {
	ts.mu.RLock()
	defer ts.mu.RUnlock()
	return ts.lastError
}

// LastScrape returns the time of the last scrape.
func (ts *TargetStatus) LastScrape() time.Time {
	ts.mu.RLock()
	defer ts.mu.RUnlock()
	return ts.lastScrape
}

// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
	ts.mu.RLock()
	defer ts.mu.RUnlock()
	return ts.health
}

func (ts *TargetStatus) setLastScrape(t time.Time) {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.lastScrape = t
}

func (ts *TargetStatus) setLastError(err error) {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	if err == nil {
		ts.health = HealthGood
	} else {
		ts.health = HealthBad
	}
	ts.lastError = err
}

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
	// The status object for the target. It is only set once on initialization.
	status *TargetStatus
	// Closing scraperStopping signals that scraping should stop.
	scraperStopping chan struct{}
	// Closing scraperStopped signals that scraping has been stopped.
	scraperStopped chan struct{}
	// Channel to buffer ingested samples.
	ingestedSamples chan model.Vector

	// Mutex protects the members below.
	sync.RWMutex
	// The HTTP client used to scrape the target's endpoint.
	httpClient *http.Client
	// url is the URL to be scraped. Its host is immutable.
	url *url.URL
	// Labels before any processing.
	metaLabels model.LabelSet
	// Any base labels that are added to this target and its metrics.
	baseLabels model.LabelSet
	// Internal labels, such as scheme.
	internalLabels model.LabelSet
	// What is the deadline for the HTTP or HTTPS against this endpoint.
	deadline time.Duration
	// The time between two scrapes.
	scrapeInterval time.Duration
	// Whether the target's labels have precedence over the base labels
	// assigned by the scraping instance.
	honorLabels bool
	// Metric relabel configuration.
	metricRelabelConfigs []*config.RelabelConfig
}

// NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) *Target {
	t := &Target{
		url: &url.URL{
			Scheme: string(baseLabels[model.SchemeLabel]),
			Host:   string(baseLabels[model.AddressLabel]),
		},
		status:          &TargetStatus{},
		scraperStopping: make(chan struct{}),
		scraperStopped:  make(chan struct{}),
	}
	t.Update(cfg, baseLabels, metaLabels)
	return t
}

// Status returns the status of the target.
func (t *Target) Status() *TargetStatus {
	return t.status
}

// Update overwrites settings in the target that are derived from the job config
// it belongs to.
func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) {
	t.Lock()
	defer t.Unlock()

	httpClient, err := newHTTPClient(cfg)
	if err != nil {
		log.Errorf("cannot create HTTP client: %v", err)
		return
	}
	t.httpClient = httpClient

	t.url.Scheme = string(baseLabels[model.SchemeLabel])
	t.url.Path = string(baseLabels[model.MetricsPathLabel])

	t.internalLabels = model.LabelSet{}
	t.internalLabels[model.SchemeLabel] = baseLabels[model.SchemeLabel]
	t.internalLabels[model.MetricsPathLabel] = baseLabels[model.MetricsPathLabel]
	t.internalLabels[model.AddressLabel] = model.LabelValue(t.url.Host)

	params := url.Values{}

	for k, v := range cfg.Params {
		params[k] = make([]string, len(v))
		copy(params[k], v)
	}
	for k, v := range baseLabels {
		if strings.HasPrefix(string(k), model.ParamLabelPrefix) {
			if len(params[string(k[len(model.ParamLabelPrefix):])]) > 0 {
				params[string(k[len(model.ParamLabelPrefix):])][0] = string(v)
			} else {
				params[string(k[len(model.ParamLabelPrefix):])] = []string{string(v)}
			}
			t.internalLabels[model.ParamLabelPrefix+k[len(model.ParamLabelPrefix):]] = v
		}
	}
	t.url.RawQuery = params.Encode()

	t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
	t.deadline = time.Duration(cfg.ScrapeTimeout)

	t.honorLabels = cfg.HonorLabels
	t.metaLabels = metaLabels
	t.baseLabels = model.LabelSet{}
	// All remaining internal labels will not be part of the label set.
	for name, val := range baseLabels {
		if !strings.HasPrefix(string(name), model.ReservedLabelPrefix) {
			t.baseLabels[name] = val
		}
	}
	if _, ok := t.baseLabels[model.InstanceLabel]; !ok {
		t.baseLabels[model.InstanceLabel] = model.LabelValue(t.InstanceIdentifier())
	}
	t.metricRelabelConfigs = cfg.MetricRelabelConfigs
}

func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
	rt := httputil.NewDeadlineRoundTripper(time.Duration(cfg.ScrapeTimeout), cfg.ProxyURL.URL)

	tlsOpts := httputil.TLSOptions{
		InsecureSkipVerify: cfg.TLSConfig.InsecureSkipVerify,
		CAFile:             cfg.TLSConfig.CAFile,
	}
	if len(cfg.TLSConfig.CertFile) > 0 && len(cfg.TLSConfig.KeyFile) > 0 {
		tlsOpts.CertFile = cfg.TLSConfig.CertFile
		tlsOpts.KeyFile = cfg.TLSConfig.KeyFile
	}
	tlsConfig, err := httputil.NewTLSConfig(tlsOpts)
	if err != nil {
		return nil, err
	}
	// Get a default roundtripper with the scrape timeout.
	tr := rt.(*http.Transport)
	// Set the TLS config from above
	tr.TLSClientConfig = tlsConfig
	rt = tr

	// If a bearer token is provided, create a round tripper that will set the
	// Authorization header correctly on each request.
	bearerToken := cfg.BearerToken
	if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
		b, err := ioutil.ReadFile(cfg.BearerTokenFile)
		if err != nil {
			return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
		}
		bearerToken = string(b)
	}

	if len(bearerToken) > 0 {
		rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt)
	}

	if cfg.BasicAuth != nil {
		rt = httputil.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
	}

	// Return a new client with the configured round tripper.
	return httputil.NewClient(rt), nil
}

func (t *Target) String() string {
	return t.url.Host
}

// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
	defer close(t.scraperStopped)

	t.RLock()
	lastScrapeInterval := t.scrapeInterval
	t.RUnlock()

	log.Debugf("Starting scraper for target %v...", t)

	jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64()))
	select {
	case <-jitterTimer.C:
	case <-t.scraperStopping:
		jitterTimer.Stop()
		return
	}
	jitterTimer.Stop()

	ticker := time.NewTicker(lastScrapeInterval)
	defer ticker.Stop()

	t.status.setLastScrape(time.Now())
	t.scrape(sampleAppender)

	// Explanation of the contraption below:
	//
	// In case t.scraperStopping has something to receive, we want to read
	// from that channel rather than starting a new scrape (which might take very
	// long). That's why the outer select has no ticker.C. Should t.scraperStopping
	// not have anything to receive, we go into the inner select, where ticker.C
	// is in the mix.
	for {
		select {
		case <-t.scraperStopping:
			return
		default:
			select {
			case <-t.scraperStopping:
				return
			case <-ticker.C:
				took := time.Since(t.status.LastScrape())
				t.status.setLastScrape(time.Now())

				intervalStr := lastScrapeInterval.String()

				t.RLock()
				// On changed scrape interval the new interval becomes effective
				// after the next scrape.
				if lastScrapeInterval != t.scrapeInterval {
					ticker.Stop()
					ticker = time.NewTicker(t.scrapeInterval)
					lastScrapeInterval = t.scrapeInterval
				}
				t.RUnlock()

				targetIntervalLength.WithLabelValues(intervalStr).Observe(
					float64(took) / float64(time.Second), // Sub-second precision.
				)
				t.scrape(sampleAppender)
			}
		}
	}
}

// StopScraper implements Target.
func (t *Target) StopScraper() {
	log.Debugf("Stopping scraper for target %v...", t)

	close(t.scraperStopping)
	<-t.scraperStopped

	log.Debugf("Scraper for target %v stopped.", t)
}

func (t *Target) ingest(s model.Vector) error {
	t.RLock()
	deadline := t.deadline
	t.RUnlock()
	// Since the regular case is that ingestedSamples is ready to receive,
	// first try without setting a timeout so that we don't need to allocate
	// a timer most of the time.
	select {
	case t.ingestedSamples <- s:
		return nil
	default:
		select {
		case t.ingestedSamples <- s:
			return nil
		case <-time.After(deadline / 10):
			return errIngestChannelFull
		}
	}
}

const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`

func (t *Target) scrape(appender storage.SampleAppender) (err error) {
	start := time.Now()
	baseLabels := t.BaseLabels()

	defer func(appender storage.SampleAppender) {
		t.status.setLastError(err)
		recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start))
	}(appender)

	t.RLock()

	// The relabelAppender has to be inside the label-modifying appenders
	// so the relabeling rules are applied to the correct label set.
	if len(t.metricRelabelConfigs) > 0 {
		appender = relabelAppender{
			app:         appender,
			relabelings: t.metricRelabelConfigs,
		}
	}

	if t.honorLabels {
		appender = honorLabelsAppender{
			app:    appender,
			labels: baseLabels,
		}
	} else {
		appender = ruleLabelsAppender{
			app:    appender,
			labels: baseLabels,
		}
	}

	httpClient := t.httpClient

	t.RUnlock()

	req, err := http.NewRequest("GET", t.URL().String(), nil)
	if err != nil {
		return err
	}
	req.Header.Add("Accept", acceptHeader)

	resp, err := httpClient.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("server returned HTTP status %s", resp.Status)
	}

	dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header))

	sdec := expfmt.SampleDecoder{
		Dec: dec,
		Opts: &expfmt.DecodeOptions{
			Timestamp: model.TimeFromUnixNano(start.UnixNano()),
		},
	}

	t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap)

	go func() {
		for {
			// TODO(fabxc): Change the SampleAppender interface to return an error
			// so we can proceed based on the status and don't leak goroutines trying
			// to append a single sample after dropping all the other ones.
			//
			// This will also allow use to reuse this vector and save allocations.
			var samples model.Vector
			if err = sdec.Decode(&samples); err != nil {
				break
			}
			if err = t.ingest(samples); err != nil {
				break
			}
		}
		close(t.ingestedSamples)
	}()

	for samples := range t.ingestedSamples {
		for _, s := range samples {
			appender.Append(s)
		}
	}

	if err == io.EOF {
		return nil
	}
	return err
}

// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
	app    storage.SampleAppender
	labels model.LabelSet
}

func (app ruleLabelsAppender) Append(s *model.Sample) {
	for ln, lv := range app.labels {
		if v, ok := s.Metric[ln]; ok && v != "" {
			s.Metric[model.ExportedLabelPrefix+ln] = v
		}
		s.Metric[ln] = lv
	}

	app.app.Append(s)
}

type honorLabelsAppender struct {
	app    storage.SampleAppender
	labels model.LabelSet
}

// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Append(s *model.Sample) {
	for ln, lv := range app.labels {
		if _, ok := s.Metric[ln]; !ok {
			s.Metric[ln] = lv
		}
	}

	app.app.Append(s)
}

// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
	app         storage.SampleAppender
	relabelings []*config.RelabelConfig
}

func (app relabelAppender) Append(s *model.Sample) {
	labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
	if err != nil {
		log.Errorf("Error while relabeling metric %s: %s", s.Metric, err)
		return
	}
	// Check if the timeseries was dropped.
	if labels == nil {
		return
	}
	s.Metric = model.Metric(labels)

	app.app.Append(s)
}

// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
	t.RLock()
	defer t.RUnlock()

	u := &url.URL{}
	*u = *t.url
	return u
}

// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
	return t.url.Host
}

// fullLabels returns the base labels plus internal labels defining the target.
func (t *Target) fullLabels() model.LabelSet {
	t.RLock()
	defer t.RUnlock()
	lset := make(model.LabelSet, len(t.baseLabels)+len(t.internalLabels))
	for ln, lv := range t.baseLabels {
		lset[ln] = lv
	}
	for k, v := range t.internalLabels {
		lset[k] = v
	}
	return lset
}

// BaseLabels returns a copy of the target's base labels.
func (t *Target) BaseLabels() model.LabelSet {
	t.RLock()
	defer t.RUnlock()
	lset := make(model.LabelSet, len(t.baseLabels))
	for ln, lv := range t.baseLabels {
		lset[ln] = lv
	}
	return lset
}

// MetaLabels returns a copy of the target's labels before any processing.
func (t *Target) MetaLabels() model.LabelSet {
	t.RLock()
	defer t.RUnlock()
	lset := make(model.LabelSet, len(t.metaLabels))
	for ln, lv := range t.metaLabels {
		lset[ln] = lv
	}
	return lset
}

func recordScrapeHealth(
	sampleAppender storage.SampleAppender,
	timestamp time.Time,
	baseLabels model.LabelSet,
	health TargetHealth,
	scrapeDuration time.Duration,
) {
	healthMetric := make(model.Metric, len(baseLabels)+1)
	durationMetric := make(model.Metric, len(baseLabels)+1)

	healthMetric[model.MetricNameLabel] = scrapeHealthMetricName
	durationMetric[model.MetricNameLabel] = scrapeDurationMetricName

	for ln, lv := range baseLabels {
		healthMetric[ln] = lv
		durationMetric[ln] = lv
	}

	ts := model.TimeFromUnixNano(timestamp.UnixNano())

	healthSample := &model.Sample{
		Metric:    healthMetric,
		Timestamp: ts,
		Value:     health.value(),
	}
	durationSample := &model.Sample{
		Metric:    durationMetric,
		Timestamp: ts,
		Value:     model.SampleValue(float64(scrapeDuration) / float64(time.Second)),
	}

	sampleAppender.Append(healthSample)
	sampleAppender.Append(durationSample)
}