// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package textparse

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"io"
	"math"
	"strings"
	"unicode/utf8"

	"github.com/gogo/protobuf/proto"
	"github.com/pkg/errors"
	"github.com/prometheus/common/model"

	"github.com/prometheus/prometheus/model/exemplar"
	"github.com/prometheus/prometheus/model/histogram"
	"github.com/prometheus/prometheus/model/labels"

	dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
)

// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
// protobuf format and then present it as it if were parsed by a
// Prometheus-2-style text parser. This is only done so that we can easily plug
// in the protobuf format into Prometheus 2. For future use (with the final
// format that will be used for native histograms), we have to revisit the
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
// could be used in a similar fashion (byte-slice pointers into the raw
// payload), which requires some hand-coded protobuf handling. But the current
// parsers all expect the full series name (metric name plus label pairs) as one
// string, which is not how things are represented in the protobuf format. If
// the re-arrangement work is actually causing problems (which has to be seen),
// that expectation needs to be changed.
type ProtobufParser struct {
	in        []byte // The intput to parse.
	inPos     int    // Position within the input.
	metricPos int    // Position within Metric slice.
	// fieldPos is the position within a Summary or (legacy) Histogram. -2
	// is the count. -1 is the sum. Otherwise it is the index within
	// quantiles/buckets.
	fieldPos   int
	fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed.
	// state is marked by the entry we are processing. EntryInvalid implies
	// that we have to decode the next MetricFamily.
	state Entry

	builder labels.ScratchBuilder // held here to reduce allocations when building Labels

	mf *dto.MetricFamily

	// The following are just shenanigans to satisfy the Parser interface.
	metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
}

// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser(b []byte) Parser {
	return &ProtobufParser{
		in:          b,
		state:       EntryInvalid,
		mf:          &dto.MetricFamily{},
		metricBytes: &bytes.Buffer{},
	}
}

// Series returns the bytes of a series with a simple float64 as a
// value, the timestamp if set, and the value of the current sample.
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
	var (
		m  = p.mf.GetMetric()[p.metricPos]
		ts = m.GetTimestampMs()
		v  float64
	)
	switch p.mf.GetType() {
	case dto.MetricType_COUNTER:
		v = m.GetCounter().GetValue()
	case dto.MetricType_GAUGE:
		v = m.GetGauge().GetValue()
	case dto.MetricType_UNTYPED:
		v = m.GetUntyped().GetValue()
	case dto.MetricType_SUMMARY:
		s := m.GetSummary()
		switch p.fieldPos {
		case -2:
			v = float64(s.GetSampleCount())
		case -1:
			v = s.GetSampleSum()
			// Need to detect a summaries without quantile here.
			if len(s.GetQuantile()) == 0 {
				p.fieldsDone = true
			}
		default:
			v = s.GetQuantile()[p.fieldPos].GetValue()
		}
	case dto.MetricType_HISTOGRAM:
		// This should only happen for a legacy histogram.
		h := m.GetHistogram()
		switch p.fieldPos {
		case -2:
			v = float64(h.GetSampleCount())
		case -1:
			v = h.GetSampleSum()
		default:
			bb := h.GetBucket()
			if p.fieldPos >= len(bb) {
				v = float64(h.GetSampleCount())
			} else {
				v = float64(bb[p.fieldPos].GetCumulativeCount())
			}
		}
	default:
		panic("encountered unexpected metric type, this is a bug")
	}
	if ts != 0 {
		return p.metricBytes.Bytes(), &ts, v
	}
	// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
	// general, but proto3 has no distinction between unset and
	// default. Need to avoid in the final format.
	return p.metricBytes.Bytes(), nil, v
}

// Histogram returns the bytes of a series with a native histogram as a value,
// the timestamp if set, and the native histogram in the current sample.
//
// The Compact method is called before returning the Histogram (or FloatHistogram).
//
// If the SampleCountFloat or the ZeroCountFloat in the proto message is > 0,
// the histogram is parsed and returned as a FloatHistogram and nil is returned
// as the (integer) Histogram return value. Otherwise, it is parsed and returned
// as an (integer) Histogram and nil is returned as the FloatHistogram return
// value.
func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
	var (
		m  = p.mf.GetMetric()[p.metricPos]
		ts = m.GetTimestampMs()
		h  = m.GetHistogram()
	)
	if h.GetSampleCountFloat() > 0 || h.GetZeroCountFloat() > 0 {
		// It is a float histogram.
		fh := histogram.FloatHistogram{
			Count:           h.GetSampleCountFloat(),
			Sum:             h.GetSampleSum(),
			ZeroThreshold:   h.GetZeroThreshold(),
			ZeroCount:       h.GetZeroCountFloat(),
			Schema:          h.GetSchema(),
			PositiveSpans:   make([]histogram.Span, len(h.GetPositiveSpan())),
			PositiveBuckets: h.GetPositiveCount(),
			NegativeSpans:   make([]histogram.Span, len(h.GetNegativeSpan())),
			NegativeBuckets: h.GetNegativeCount(),
		}
		for i, span := range h.GetPositiveSpan() {
			fh.PositiveSpans[i].Offset = span.GetOffset()
			fh.PositiveSpans[i].Length = span.GetLength()
		}
		for i, span := range h.GetNegativeSpan() {
			fh.NegativeSpans[i].Offset = span.GetOffset()
			fh.NegativeSpans[i].Length = span.GetLength()
		}
		fh.Compact(0)
		if ts != 0 {
			return p.metricBytes.Bytes(), &ts, nil, &fh
		}
		// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
		// general, but proto3 has no distinction between unset and
		// default. Need to avoid in the final format.
		return p.metricBytes.Bytes(), nil, nil, &fh
	}

	sh := histogram.Histogram{
		Count:           h.GetSampleCount(),
		Sum:             h.GetSampleSum(),
		ZeroThreshold:   h.GetZeroThreshold(),
		ZeroCount:       h.GetZeroCount(),
		Schema:          h.GetSchema(),
		PositiveSpans:   make([]histogram.Span, len(h.GetPositiveSpan())),
		PositiveBuckets: h.GetPositiveDelta(),
		NegativeSpans:   make([]histogram.Span, len(h.GetNegativeSpan())),
		NegativeBuckets: h.GetNegativeDelta(),
	}
	for i, span := range h.GetPositiveSpan() {
		sh.PositiveSpans[i].Offset = span.GetOffset()
		sh.PositiveSpans[i].Length = span.GetLength()
	}
	for i, span := range h.GetNegativeSpan() {
		sh.NegativeSpans[i].Offset = span.GetOffset()
		sh.NegativeSpans[i].Length = span.GetLength()
	}
	sh.Compact(0)
	if ts != 0 {
		return p.metricBytes.Bytes(), &ts, &sh, nil
	}
	return p.metricBytes.Bytes(), nil, &sh, nil
}

// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Help() ([]byte, []byte) {
	return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
}

// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Type() ([]byte, MetricType) {
	n := p.metricBytes.Bytes()
	switch p.mf.GetType() {
	case dto.MetricType_COUNTER:
		return n, MetricTypeCounter
	case dto.MetricType_GAUGE:
		return n, MetricTypeGauge
	case dto.MetricType_HISTOGRAM:
		return n, MetricTypeHistogram
	case dto.MetricType_SUMMARY:
		return n, MetricTypeSummary
	}
	return n, MetricTypeUnknown
}

// Unit always returns (nil, nil) because units aren't supported by the protobuf
// format.
func (p *ProtobufParser) Unit() ([]byte, []byte) {
	return nil, nil
}

// Comment always returns nil because comments aren't supported by the protobuf
// format.
func (p *ProtobufParser) Comment() []byte {
	return nil
}

// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *ProtobufParser) Metric(l *labels.Labels) string {
	p.builder.Reset()
	p.builder.Add(labels.MetricName, p.getMagicName())

	for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
		p.builder.Add(lp.GetName(), lp.GetValue())
	}
	if needed, name, value := p.getMagicLabel(); needed {
		p.builder.Add(name, value)
	}

	// Sort labels to maintain the sorted labels invariant.
	p.builder.Sort()
	*l = p.builder.Labels()

	return p.metricBytes.String()
}

// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns if an exemplar exists or not. In case of a native
// histogram, the legacy bucket section is still used for exemplars. To ingest
// all examplars, call the Exemplar method repeatedly until it returns false.
func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
	m := p.mf.GetMetric()[p.metricPos]
	var exProto *dto.Exemplar
	switch p.mf.GetType() {
	case dto.MetricType_COUNTER:
		exProto = m.GetCounter().GetExemplar()
	case dto.MetricType_HISTOGRAM:
		bb := m.GetHistogram().GetBucket()
		if p.fieldPos < 0 {
			if p.state == EntrySeries {
				return false // At _count or _sum.
			}
			p.fieldPos = 0 // Start at 1st bucket for native histograms.
		}
		for p.fieldPos < len(bb) {
			exProto = bb[p.fieldPos].GetExemplar()
			if p.state == EntrySeries {
				break
			}
			p.fieldPos++
			if exProto != nil {
				break
			}
		}
	default:
		return false
	}
	if exProto == nil {
		return false
	}
	ex.Value = exProto.GetValue()
	if ts := exProto.GetTimestamp(); ts != nil {
		ex.HasTs = true
		ex.Ts = ts.GetSeconds()*1000 + int64(ts.GetNanos()/1_000_000)
	}
	p.builder.Reset()
	for _, lp := range exProto.GetLabel() {
		p.builder.Add(lp.GetName(), lp.GetValue())
	}
	p.builder.Sort()
	ex.Labels = p.builder.Labels()
	return true
}

// Next advances the parser to the next "sample" (emulating the behavior of a
// text format parser). It returns (EntryInvalid, io.EOF) if no samples were
// read.
func (p *ProtobufParser) Next() (Entry, error) {
	switch p.state {
	case EntryInvalid:
		p.metricPos = 0
		p.fieldPos = -2
		n, err := readDelimited(p.in[p.inPos:], p.mf)
		p.inPos += n
		if err != nil {
			return p.state, err
		}

		// Skip empty metric families.
		if len(p.mf.GetMetric()) == 0 {
			return p.Next()
		}

		// We are at the beginning of a metric family. Put only the name
		// into metricBytes and validate only name and help for now.
		name := p.mf.GetName()
		if !model.IsValidMetricName(model.LabelValue(name)) {
			return EntryInvalid, errors.Errorf("invalid metric name: %s", name)
		}
		if help := p.mf.GetHelp(); !utf8.ValidString(help) {
			return EntryInvalid, errors.Errorf("invalid help for metric %q: %s", name, help)
		}
		p.metricBytes.Reset()
		p.metricBytes.WriteString(name)

		p.state = EntryHelp
	case EntryHelp:
		p.state = EntryType
	case EntryType:
		if p.mf.GetType() == dto.MetricType_HISTOGRAM &&
			isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) {
			p.state = EntryHistogram
		} else {
			p.state = EntrySeries
		}
		if err := p.updateMetricBytes(); err != nil {
			return EntryInvalid, err
		}
	case EntryHistogram, EntrySeries:
		if p.state == EntrySeries && !p.fieldsDone &&
			(p.mf.GetType() == dto.MetricType_SUMMARY || p.mf.GetType() == dto.MetricType_HISTOGRAM) {
			p.fieldPos++
		} else {
			p.metricPos++
			p.fieldPos = -2
			p.fieldsDone = false
		}
		if p.metricPos >= len(p.mf.GetMetric()) {
			p.state = EntryInvalid
			return p.Next()
		}
		if err := p.updateMetricBytes(); err != nil {
			return EntryInvalid, err
		}
	default:
		return EntryInvalid, errors.Errorf("invalid protobuf parsing state: %d", p.state)
	}
	return p.state, nil
}

func (p *ProtobufParser) updateMetricBytes() error {
	b := p.metricBytes
	b.Reset()
	b.WriteString(p.getMagicName())
	for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
		b.WriteByte(model.SeparatorByte)
		n := lp.GetName()
		if !model.LabelName(n).IsValid() {
			return errors.Errorf("invalid label name: %s", n)
		}
		b.WriteString(n)
		b.WriteByte(model.SeparatorByte)
		v := lp.GetValue()
		if !utf8.ValidString(v) {
			return errors.Errorf("invalid label value: %s", v)
		}
		b.WriteString(v)
	}
	if needed, n, v := p.getMagicLabel(); needed {
		b.WriteByte(model.SeparatorByte)
		b.WriteString(n)
		b.WriteByte(model.SeparatorByte)
		b.WriteString(v)
	}
	return nil
}

// getMagicName usually just returns p.mf.GetType() but adds a magic suffix
// ("_count", "_sum", "_bucket") if needed according to the current parser
// state.
func (p *ProtobufParser) getMagicName() string {
	t := p.mf.GetType()
	if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_SUMMARY) {
		return p.mf.GetName()
	}
	if p.fieldPos == -2 {
		return p.mf.GetName() + "_count"
	}
	if p.fieldPos == -1 {
		return p.mf.GetName() + "_sum"
	}
	if t == dto.MetricType_HISTOGRAM {
		return p.mf.GetName() + "_bucket"
	}
	return p.mf.GetName()
}

// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if
// so, its name and value. It also sets p.fieldsDone if applicable.
func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
	if p.state == EntryHistogram || p.fieldPos < 0 {
		return false, "", ""
	}
	switch p.mf.GetType() {
	case dto.MetricType_SUMMARY:
		qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile()
		q := qq[p.fieldPos]
		p.fieldsDone = p.fieldPos == len(qq)-1
		return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile())
	case dto.MetricType_HISTOGRAM:
		bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket()
		if p.fieldPos >= len(bb) {
			p.fieldsDone = true
			return true, model.BucketLabel, "+Inf"
		}
		b := bb[p.fieldPos]
		p.fieldsDone = math.IsInf(b.GetUpperBound(), +1)
		return true, model.BucketLabel, formatOpenMetricsFloat(b.GetUpperBound())
	}
	return false, "", ""
}

var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")

// readDelimited is essentially doing what the function of the same name in
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
// unmarshaling, and acts on a byte slice directly without any additional
// staging buffers.
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
	if len(b) == 0 {
		return 0, io.EOF
	}
	messageLength, varIntLength := proto.DecodeVarint(b)
	if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
		return 0, errInvalidVarint
	}
	totalLength := varIntLength + int(messageLength)
	if totalLength > len(b) {
		return 0, errors.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
	}
	mf.Reset()
	return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
}

// formatOpenMetricsFloat works like the usual Go string formatting of a fleat
// but appends ".0" if the resulting number would otherwise contain neither a
// "." nor an "e".
func formatOpenMetricsFloat(f float64) string {
	// A few common cases hardcoded.
	switch {
	case f == 1:
		return "1.0"
	case f == 0:
		return "0.0"
	case f == -1:
		return "-1.0"
	case math.IsNaN(f):
		return "NaN"
	case math.IsInf(f, +1):
		return "+Inf"
	case math.IsInf(f, -1):
		return "-Inf"
	}
	s := fmt.Sprint(f)
	if strings.ContainsAny(s, "e.") {
		return s
	}
	return s + ".0"
}

// isNativeHistogram returns false iff the provided histograms has no sparse
// buckets and a zero threshold of 0 and a zero count of 0. In principle, this
// could still be meant to be a native histogram (with a zero threshold of 0 and
// no observations yet), but for now, we'll treat this case as a conventional
// histogram.
//
// TODO(beorn7): In the final format, there should be an unambiguous way of
// deciding if a histogram should be ingested as a conventional one or a native
// one.
func isNativeHistogram(h *dto.Histogram) bool {
	return len(h.GetNegativeDelta()) > 0 ||
		len(h.GetPositiveDelta()) > 0 ||
		h.GetZeroCount() > 0 ||
		h.GetZeroThreshold() > 0
}