// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retrieval

import (
	"github.com/prometheus/prometheus/retrieval/format"
	"log"
	"sort"
	"time"
)

const (
	intervalKey = "interval"
)

type TargetPool struct {
	done                chan bool
	manager             TargetManager
	targets             []Target
	addTargetQueue      chan Target
	replaceTargetsQueue chan []Target
}

func NewTargetPool(m TargetManager) *TargetPool {
	return &TargetPool{
		manager:             m,
		addTargetQueue:      make(chan Target),
		replaceTargetsQueue: make(chan []Target),
	}
}

func (p TargetPool) Len() int {
	return len(p.targets)
}

func (p TargetPool) Less(i, j int) bool {
	return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}

func (p TargetPool) Swap(i, j int) {
	p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}

func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
	ticker := time.Tick(interval)

	for {
		select {
		case <-ticker:
			p.runIteration(results, interval)
		case newTarget := <-p.addTargetQueue:
			p.addTarget(newTarget)
		case newTargets := <-p.replaceTargetsQueue:
			p.replaceTargets(newTargets)
		case <-p.done:
			log.Printf("TargetPool exiting...")
			break
		}
	}
}

func (p TargetPool) Stop() {
	p.done <- true
}

func (p *TargetPool) AddTarget(target Target) {
	p.addTargetQueue <- target
}

func (p *TargetPool) addTarget(target Target) {
	p.targets = append(p.targets, target)
}

func (p *TargetPool) ReplaceTargets(newTargets []Target) {
	p.replaceTargetsQueue <- newTargets
}

func (p *TargetPool) replaceTargets(newTargets []Target) {
	// Replace old target list by new one, but reuse those targets from the old
	// list of targets which are also in the new list (to preserve scheduling and
	// health state).
	for j, newTarget := range newTargets {
		for _, oldTarget := range p.targets {
			if oldTarget.Address() == newTarget.Address() {
				oldTarget.Merge(newTargets[j])
				newTargets[j] = oldTarget
			}
		}
	}
	p.targets = newTargets
}

func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
	p.manager.acquire()
	defer p.manager.release()

	t.Scrape(earliest, results)
}

func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
	begin := time.Now()

	targetCount := p.Len()
	finished := make(chan bool, targetCount)

	// Sort p.targets by next scheduling time so we can process the earliest
	// targets first.
	sort.Sort(p)

	for _, target := range p.targets {
		now := time.Now()

		if target.scheduledFor().After(now) {
			// None of the remaining targets are ready to be scheduled. Signal that
			// we're done processing them in this scrape iteration.
			finished <- true
			continue
		}

		go func(t Target) {
			p.runSingle(now, results, t)
			finished <- true
		}(target)
	}

	for i := 0; i < targetCount; {
		select {
		case <-finished:
			i++
		case newTarget := <-p.addTargetQueue:
			p.addTarget(newTarget)
		case newTargets := <-p.replaceTargetsQueue:
			p.replaceTargets(newTargets)
		}
	}

	close(finished)

	duration := float64(time.Since(begin) / time.Millisecond)
	retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}

// BUG(all): Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target {
	return p.targets
}