// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package promql

import (
	"bytes"
	"container/heap"
	"context"
	"errors"
	"fmt"
	"math"
	"reflect"
	"runtime"
	"sort"
	"strconv"
	"sync"
	"time"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/grafana/regexp"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/common/model"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"golang.org/x/exp/slices"

	"github.com/prometheus/prometheus/model/histogram"
	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/model/timestamp"
	"github.com/prometheus/prometheus/model/value"
	"github.com/prometheus/prometheus/promql/parser"
	"github.com/prometheus/prometheus/storage"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
	"github.com/prometheus/prometheus/util/annotations"
	"github.com/prometheus/prometheus/util/stats"
	"github.com/prometheus/prometheus/util/zeropool"
)

const (
	namespace            = "prometheus"
	subsystem            = "engine"
	queryTag             = "query"
	env                  = "query execution"
	defaultLookbackDelta = 5 * time.Minute

	// The largest SampleValue that can be converted to an int64 without overflow.
	maxInt64 = 9223372036854774784
	// The smallest SampleValue that can be converted to an int64 without underflow.
	minInt64 = -9223372036854775808

	// Max initial size for the pooled points slices.
	// The getHPointSlice and getFPointSlice functions are called with an estimated size which often can be
	// over-estimated.
	maxPointsSliceSize = 5000
)

type engineMetrics struct {
	currentQueries       prometheus.Gauge
	maxConcurrentQueries prometheus.Gauge
	queryLogEnabled      prometheus.Gauge
	queryLogFailures     prometheus.Counter
	queryQueueTime       prometheus.Observer
	queryPrepareTime     prometheus.Observer
	queryInnerEval       prometheus.Observer
	queryResultSort      prometheus.Observer
	querySamples         prometheus.Counter
}

// convertibleToInt64 returns true if v does not over-/underflow an int64.
func convertibleToInt64(v float64) bool {
	return v <= maxInt64 && v >= minInt64
}

type (
	// ErrQueryTimeout is returned if a query timed out during processing.
	ErrQueryTimeout string
	// ErrQueryCanceled is returned if a query was canceled during processing.
	ErrQueryCanceled string
	// ErrTooManySamples is returned if a query would load more than the maximum allowed samples into memory.
	ErrTooManySamples string
	// ErrStorage is returned if an error was encountered in the storage layer
	// during query handling.
	ErrStorage struct{ Err error }
)

func (e ErrQueryTimeout) Error() string {
	return fmt.Sprintf("query timed out in %s", string(e))
}

func (e ErrQueryCanceled) Error() string {
	return fmt.Sprintf("query was canceled in %s", string(e))
}

func (e ErrTooManySamples) Error() string {
	return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e))
}

func (e ErrStorage) Error() string {
	return e.Err.Error()
}

// QueryLogger is an interface that can be used to log all the queries logged
// by the engine.
type QueryLogger interface {
	Log(...interface{}) error
	Close() error
}

// A Query is derived from an a raw query string and can be run against an engine
// it is associated with.
type Query interface {
	// Exec processes the query. Can only be called once.
	Exec(ctx context.Context) *Result
	// Close recovers memory used by the query result.
	Close()
	// Statement returns the parsed statement of the query.
	Statement() parser.Statement
	// Stats returns statistics about the lifetime of the query.
	Stats() *stats.Statistics
	// Cancel signals that a running query execution should be aborted.
	Cancel()
	// String returns the original query string.
	String() string
}

type PrometheusQueryOpts struct {
	// Enables recording per-step statistics if the engine has it enabled as well. Disabled by default.
	enablePerStepStats bool
	// Lookback delta duration for this query.
	lookbackDelta time.Duration
}

var _ QueryOpts = &PrometheusQueryOpts{}

func NewPrometheusQueryOpts(enablePerStepStats bool, lookbackDelta time.Duration) QueryOpts {
	return &PrometheusQueryOpts{
		enablePerStepStats: enablePerStepStats,
		lookbackDelta:      lookbackDelta,
	}
}

func (p *PrometheusQueryOpts) EnablePerStepStats() bool {
	return p.enablePerStepStats
}

func (p *PrometheusQueryOpts) LookbackDelta() time.Duration {
	return p.lookbackDelta
}

type QueryOpts interface {
	// Enables recording per-step statistics if the engine has it enabled as well. Disabled by default.
	EnablePerStepStats() bool
	// Lookback delta duration for this query.
	LookbackDelta() time.Duration
}

// query implements the Query interface.
type query struct {
	// Underlying data provider.
	queryable storage.Queryable
	// The original query string.
	q string
	// Statement of the parsed query.
	stmt parser.Statement
	// Timer stats for the query execution.
	stats *stats.QueryTimers
	// Sample stats for the query execution.
	sampleStats *stats.QuerySamples
	// Result matrix for reuse.
	matrix Matrix
	// Cancellation function for the query.
	cancel func()

	// The engine against which the query is executed.
	ng *Engine
}

type QueryOrigin struct{}

// Statement implements the Query interface.
// Calling this after Exec may result in panic,
// see https://github.com/prometheus/prometheus/issues/8949.
func (q *query) Statement() parser.Statement {
	return q.stmt
}

// String implements the Query interface.
func (q *query) String() string {
	return q.q
}

// Stats implements the Query interface.
func (q *query) Stats() *stats.Statistics {
	return &stats.Statistics{
		Timers:  q.stats,
		Samples: q.sampleStats,
	}
}

// Cancel implements the Query interface.
func (q *query) Cancel() {
	if q.cancel != nil {
		q.cancel()
	}
}

// Close implements the Query interface.
func (q *query) Close() {
	for _, s := range q.matrix {
		putFPointSlice(s.Floats)
		putHPointSlice(s.Histograms)
	}
}

// Exec implements the Query interface.
func (q *query) Exec(ctx context.Context) *Result {
	if span := trace.SpanFromContext(ctx); span != nil {
		span.SetAttributes(attribute.String(queryTag, q.stmt.String()))
	}

	// Exec query.
	res, warnings, err := q.ng.exec(ctx, q)
	return &Result{Err: err, Value: res, Warnings: warnings}
}

// contextDone returns an error if the context was canceled or timed out.
func contextDone(ctx context.Context, env string) error {
	if err := ctx.Err(); err != nil {
		return contextErr(err, env)
	}
	return nil
}

func contextErr(err error, env string) error {
	switch {
	case errors.Is(err, context.Canceled):
		return ErrQueryCanceled(env)
	case errors.Is(err, context.DeadlineExceeded):
		return ErrQueryTimeout(env)
	default:
		return err
	}
}

// QueryTracker provides access to two features:
//
// 1) Tracking of active query. If PromQL engine crashes while executing any query, such query should be present
// in the tracker on restart, hence logged. After the logging on restart, the tracker gets emptied.
//
// 2) Enforcement of the maximum number of concurrent queries.
type QueryTracker interface {
	// GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker.
	GetMaxConcurrent() int

	// Insert inserts query into query tracker. This call must block if maximum number of queries is already running.
	// If Insert doesn't return error then returned integer value should be used in subsequent Delete call.
	// Insert should return error if context is finished before query can proceed, and integer value returned in this case should be ignored by caller.
	Insert(ctx context.Context, query string) (int, error)

	// Delete removes query from activity tracker. InsertIndex is value returned by Insert call.
	Delete(insertIndex int)
}

// EngineOpts contains configuration options used when creating a new Engine.
type EngineOpts struct {
	Logger             log.Logger
	Reg                prometheus.Registerer
	MaxSamples         int
	Timeout            time.Duration
	ActiveQueryTracker QueryTracker
	// LookbackDelta determines the time since the last sample after which a time
	// series is considered stale.
	LookbackDelta time.Duration

	// NoStepSubqueryIntervalFn is the default evaluation interval of
	// a subquery in milliseconds if no step in range vector was specified `[30m:<step>]`.
	NoStepSubqueryIntervalFn func(rangeMillis int64) int64

	// EnableAtModifier if true enables @ modifier. Disabled otherwise. This
	// is supposed to be enabled for regular PromQL (as of Prometheus v2.33)
	// but the option to disable it is still provided here for those using
	// the Engine outside of Prometheus.
	EnableAtModifier bool

	// EnableNegativeOffset if true enables negative (-) offset
	// values. Disabled otherwise. This is supposed to be enabled for
	// regular PromQL (as of Prometheus v2.33) but the option to disable it
	// is still provided here for those using the Engine outside of
	// Prometheus.
	EnableNegativeOffset bool

	// EnablePerStepStats if true allows for per-step stats to be computed on request. Disabled otherwise.
	EnablePerStepStats bool
}

// Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier.
type Engine struct {
	logger                   log.Logger
	metrics                  *engineMetrics
	timeout                  time.Duration
	maxSamplesPerQuery       int
	activeQueryTracker       QueryTracker
	queryLogger              QueryLogger
	queryLoggerLock          sync.RWMutex
	lookbackDelta            time.Duration
	noStepSubqueryIntervalFn func(rangeMillis int64) int64
	enableAtModifier         bool
	enableNegativeOffset     bool
	enablePerStepStats       bool
}

// NewEngine returns a new engine.
func NewEngine(opts EngineOpts) *Engine {
	if opts.Logger == nil {
		opts.Logger = log.NewNopLogger()
	}

	queryResultSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{
		Namespace:  namespace,
		Subsystem:  subsystem,
		Name:       "query_duration_seconds",
		Help:       "Query timings",
		Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
	},
		[]string{"slice"},
	)

	metrics := &engineMetrics{
		currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "queries",
			Help:      "The current number of queries being executed or waiting.",
		}),
		queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "query_log_enabled",
			Help:      "State of the query log.",
		}),
		queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "query_log_failures_total",
			Help:      "The number of query log failures.",
		}),
		maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "queries_concurrent_max",
			Help:      "The max number of concurrent queries.",
		}),
		querySamples: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "query_samples_total",
			Help:      "The total number of samples loaded by all queries.",
		}),
		queryQueueTime:   queryResultSummary.WithLabelValues("queue_time"),
		queryPrepareTime: queryResultSummary.WithLabelValues("prepare_time"),
		queryInnerEval:   queryResultSummary.WithLabelValues("inner_eval"),
		queryResultSort:  queryResultSummary.WithLabelValues("result_sort"),
	}

	if t := opts.ActiveQueryTracker; t != nil {
		metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent()))
	} else {
		metrics.maxConcurrentQueries.Set(-1)
	}

	if opts.LookbackDelta == 0 {
		opts.LookbackDelta = defaultLookbackDelta
		if l := opts.Logger; l != nil {
			level.Debug(l).Log("msg", "Lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
		}
	}

	if opts.Reg != nil {
		opts.Reg.MustRegister(
			metrics.currentQueries,
			metrics.maxConcurrentQueries,
			metrics.queryLogEnabled,
			metrics.queryLogFailures,
			metrics.querySamples,
			queryResultSummary,
		)
	}

	return &Engine{
		timeout:                  opts.Timeout,
		logger:                   opts.Logger,
		metrics:                  metrics,
		maxSamplesPerQuery:       opts.MaxSamples,
		activeQueryTracker:       opts.ActiveQueryTracker,
		lookbackDelta:            opts.LookbackDelta,
		noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
		enableAtModifier:         opts.EnableAtModifier,
		enableNegativeOffset:     opts.EnableNegativeOffset,
		enablePerStepStats:       opts.EnablePerStepStats,
	}
}

// SetQueryLogger sets the query logger.
func (ng *Engine) SetQueryLogger(l QueryLogger) {
	ng.queryLoggerLock.Lock()
	defer ng.queryLoggerLock.Unlock()

	if ng.queryLogger != nil {
		// An error closing the old file descriptor should
		// not make reload fail; only log a warning.
		err := ng.queryLogger.Close()
		if err != nil {
			level.Warn(ng.logger).Log("msg", "Error while closing the previous query log file", "err", err)
		}
	}

	ng.queryLogger = l

	if l != nil {
		ng.metrics.queryLogEnabled.Set(1)
	} else {
		ng.metrics.queryLogEnabled.Set(0)
	}
}

// NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, ts time.Time) (Query, error) {
	pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0)
	finishQueue, err := ng.queueActive(ctx, qry)
	if err != nil {
		return nil, err
	}
	defer finishQueue()
	expr, err := parser.ParseExpr(qs)
	if err != nil {
		return nil, err
	}
	if err := ng.validateOpts(expr); err != nil {
		return nil, err
	}
	*pExpr = PreprocessExpr(expr, ts, ts)

	return qry, nil
}

// NewRangeQuery returns an evaluation query for the given time range and with
// the resolution set by the interval.
func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
	pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval)
	finishQueue, err := ng.queueActive(ctx, qry)
	if err != nil {
		return nil, err
	}
	defer finishQueue()
	expr, err := parser.ParseExpr(qs)
	if err != nil {
		return nil, err
	}
	if err := ng.validateOpts(expr); err != nil {
		return nil, err
	}
	if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
		return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
	}
	*pExpr = PreprocessExpr(expr, start, end)

	return qry, nil
}

func (ng *Engine) newQuery(q storage.Queryable, qs string, opts QueryOpts, start, end time.Time, interval time.Duration) (*parser.Expr, *query) {
	if opts == nil {
		opts = NewPrometheusQueryOpts(false, 0)
	}

	lookbackDelta := opts.LookbackDelta()
	if lookbackDelta <= 0 {
		lookbackDelta = ng.lookbackDelta
	}

	es := &parser.EvalStmt{
		Start:         start,
		End:           end,
		Interval:      interval,
		LookbackDelta: lookbackDelta,
	}
	qry := &query{
		q:           qs,
		stmt:        es,
		ng:          ng,
		stats:       stats.NewQueryTimers(),
		sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats()),
		queryable:   q,
	}
	return &es.Expr, qry
}

var (
	ErrValidationAtModifierDisabled     = errors.New("@ modifier is disabled")
	ErrValidationNegativeOffsetDisabled = errors.New("negative offset is disabled")
)

func (ng *Engine) validateOpts(expr parser.Expr) error {
	if ng.enableAtModifier && ng.enableNegativeOffset {
		return nil
	}

	var atModifierUsed, negativeOffsetUsed bool

	var validationErr error
	parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
		switch n := node.(type) {
		case *parser.VectorSelector:
			if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END {
				atModifierUsed = true
			}
			if n.OriginalOffset < 0 {
				negativeOffsetUsed = true
			}

		case *parser.MatrixSelector:
			vs := n.VectorSelector.(*parser.VectorSelector)
			if vs.Timestamp != nil || vs.StartOrEnd == parser.START || vs.StartOrEnd == parser.END {
				atModifierUsed = true
			}
			if vs.OriginalOffset < 0 {
				negativeOffsetUsed = true
			}

		case *parser.SubqueryExpr:
			if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END {
				atModifierUsed = true
			}
			if n.OriginalOffset < 0 {
				negativeOffsetUsed = true
			}
		}

		if atModifierUsed && !ng.enableAtModifier {
			validationErr = ErrValidationAtModifierDisabled
			return validationErr
		}
		if negativeOffsetUsed && !ng.enableNegativeOffset {
			validationErr = ErrValidationNegativeOffsetDisabled
			return validationErr
		}

		return nil
	})

	return validationErr
}

func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
	qry := &query{
		q:           "test statement",
		stmt:        parser.TestStmt(f),
		ng:          ng,
		stats:       stats.NewQueryTimers(),
		sampleStats: stats.NewQuerySamples(ng.enablePerStepStats),
	}
	return qry
}

// exec executes the query.
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws annotations.Annotations, err error) {
	ng.metrics.currentQueries.Inc()
	defer func() {
		ng.metrics.currentQueries.Dec()
		ng.metrics.querySamples.Add(float64(q.sampleStats.TotalSamples))
	}()

	ctx, cancel := context.WithTimeout(ctx, ng.timeout)
	q.cancel = cancel

	defer func() {
		ng.queryLoggerLock.RLock()
		if l := ng.queryLogger; l != nil {
			params := make(map[string]interface{}, 4)
			params["query"] = q.q
			if eq, ok := q.Statement().(*parser.EvalStmt); ok {
				params["start"] = formatDate(eq.Start)
				params["end"] = formatDate(eq.End)
				// The step provided by the user is in seconds.
				params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond))
			}
			f := []interface{}{"params", params}
			if err != nil {
				f = append(f, "error", err)
			}
			f = append(f, "stats", stats.NewQueryStats(q.Stats()))
			if span := trace.SpanFromContext(ctx); span != nil {
				f = append(f, "spanID", span.SpanContext().SpanID())
			}
			if origin := ctx.Value(QueryOrigin{}); origin != nil {
				for k, v := range origin.(map[string]interface{}) {
					f = append(f, k, v)
				}
			}
			if err := l.Log(f...); err != nil {
				ng.metrics.queryLogFailures.Inc()
				level.Error(ng.logger).Log("msg", "can't log query", "err", err)
			}
		}
		ng.queryLoggerLock.RUnlock()
	}()

	execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
	defer execSpanTimer.Finish()

	finishQueue, err := ng.queueActive(ctx, q)
	if err != nil {
		return nil, nil, err
	}
	defer finishQueue()

	// Cancel when execution is done or an error was raised.
	defer q.cancel()

	evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
	defer evalSpanTimer.Finish()

	// The base context might already be canceled on the first iteration (e.g. during shutdown).
	if err := contextDone(ctx, env); err != nil {
		return nil, nil, err
	}

	switch s := q.Statement().(type) {
	case *parser.EvalStmt:
		return ng.execEvalStmt(ctx, q, s)
	case parser.TestStmt:
		return nil, nil, s(ctx)
	}

	panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
}

// Log query in active log. The active log guarantees that we don't run over
// MaxConcurrent queries.
func (ng *Engine) queueActive(ctx context.Context, q *query) (func(), error) {
	if ng.activeQueryTracker == nil {
		return func() {}, nil
	}
	queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
	queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
	queueSpanTimer.Finish()
	return func() { ng.activeQueryTracker.Delete(queryIndex) }, err
}

func timeMilliseconds(t time.Time) int64 {
	return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
}

func durationMilliseconds(d time.Duration) int64 {
	return int64(d / (time.Millisecond / time.Nanosecond))
}

// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, annotations.Annotations, error) {
	prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
	mint, maxt := FindMinMaxTime(s)
	querier, err := query.queryable.Querier(mint, maxt)
	if err != nil {
		prepareSpanTimer.Finish()
		return nil, nil, err
	}
	defer querier.Close()

	ng.populateSeries(ctxPrepare, querier, s)
	prepareSpanTimer.Finish()

	// Modify the offset of vector and matrix selectors for the @ modifier
	// w.r.t. the start time since only 1 evaluation will be done on them.
	setOffsetForAtModifier(timeMilliseconds(s.Start), s.Expr)
	evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
	// Instant evaluation. This is executed as a range evaluation with one step.
	if s.Start == s.End && s.Interval == 0 {
		start := timeMilliseconds(s.Start)
		evaluator := &evaluator{
			startTimestamp:           start,
			endTimestamp:             start,
			interval:                 1,
			ctx:                      ctxInnerEval,
			maxSamples:               ng.maxSamplesPerQuery,
			logger:                   ng.logger,
			lookbackDelta:            s.LookbackDelta,
			samplesStats:             query.sampleStats,
			noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
		}
		query.sampleStats.InitStepTracking(start, start, 1)

		val, warnings, err := evaluator.Eval(s.Expr)

		evalSpanTimer.Finish()

		if err != nil {
			return nil, warnings, err
		}

		var mat Matrix

		switch result := val.(type) {
		case Matrix:
			mat = result
		case String:
			return result, warnings, nil
		default:
			panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
		}

		query.matrix = mat
		switch s.Expr.Type() {
		case parser.ValueTypeVector:
			// Convert matrix with one value per series into vector.
			vector := make(Vector, len(mat))
			for i, s := range mat {
				// Point might have a different timestamp, force it to the evaluation
				// timestamp as that is when we ran the evaluation.
				if len(s.Histograms) > 0 {
					vector[i] = Sample{Metric: s.Metric, H: s.Histograms[0].H, T: start}
				} else {
					vector[i] = Sample{Metric: s.Metric, F: s.Floats[0].F, T: start}
				}
			}
			return vector, warnings, nil
		case parser.ValueTypeScalar:
			return Scalar{V: mat[0].Floats[0].F, T: start}, warnings, nil
		case parser.ValueTypeMatrix:
			return mat, warnings, nil
		default:
			panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
		}
	}

	// Range evaluation.
	evaluator := &evaluator{
		startTimestamp:           timeMilliseconds(s.Start),
		endTimestamp:             timeMilliseconds(s.End),
		interval:                 durationMilliseconds(s.Interval),
		ctx:                      ctxInnerEval,
		maxSamples:               ng.maxSamplesPerQuery,
		logger:                   ng.logger,
		lookbackDelta:            s.LookbackDelta,
		samplesStats:             query.sampleStats,
		noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
	}
	query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval)
	val, warnings, err := evaluator.Eval(s.Expr)

	evalSpanTimer.Finish()

	if err != nil {
		return nil, warnings, err
	}

	mat, ok := val.(Matrix)
	if !ok {
		panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
	}
	query.matrix = mat

	if err := contextDone(ctx, "expression evaluation"); err != nil {
		return nil, warnings, err
	}

	// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
	sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort)
	sort.Sort(mat)
	sortSpanTimer.Finish()

	return mat, warnings, nil
}

// subqueryTimes returns the sum of offsets and ranges of all subqueries in the path.
// If the @ modifier is used, then the offset and range is w.r.t. that timestamp
// (i.e. the sum is reset when we have @ modifier).
// The returned *int64 is the closest timestamp that was seen. nil for no @ modifier.
func subqueryTimes(path []parser.Node) (time.Duration, time.Duration, *int64) {
	var (
		subqOffset, subqRange time.Duration
		ts                    int64 = math.MaxInt64
	)
	for _, node := range path {
		if n, ok := node.(*parser.SubqueryExpr); ok {
			subqOffset += n.OriginalOffset
			subqRange += n.Range
			if n.Timestamp != nil {
				// The @ modifier on subquery invalidates all the offset and
				// range till now. Hence resetting it here.
				subqOffset = n.OriginalOffset
				subqRange = n.Range
				ts = *n.Timestamp
			}
		}
	}
	var tsp *int64
	if ts != math.MaxInt64 {
		tsp = &ts
	}
	return subqOffset, subqRange, tsp
}

// FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process.
// This takes into account offsets, @ modifiers, and range selectors.
// If the statement does not select series, then FindMinMaxTime returns (0, 0).
func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) {
	var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64
	// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
	// The evaluation of the VectorSelector inside then evaluates the given range and unsets
	// the variable.
	var evalRange time.Duration
	parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
		switch n := node.(type) {
		case *parser.VectorSelector:
			start, end := getTimeRangesForSelector(s, n, path, evalRange)
			if start < minTimestamp {
				minTimestamp = start
			}
			if end > maxTimestamp {
				maxTimestamp = end
			}
			evalRange = 0
		case *parser.MatrixSelector:
			evalRange = n.Range
		}
		return nil
	})

	if maxTimestamp == math.MinInt64 {
		// This happens when there was no selector. Hence no time range to select.
		minTimestamp = 0
		maxTimestamp = 0
	}

	return minTimestamp, maxTimestamp
}

func getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorSelector, path []parser.Node, evalRange time.Duration) (int64, int64) {
	start, end := timestamp.FromTime(s.Start), timestamp.FromTime(s.End)
	subqOffset, subqRange, subqTs := subqueryTimes(path)

	if subqTs != nil {
		// The timestamp on the subquery overrides the eval statement time ranges.
		start = *subqTs
		end = *subqTs
	}

	if n.Timestamp != nil {
		// The timestamp on the selector overrides everything.
		start = *n.Timestamp
		end = *n.Timestamp
	} else {
		offsetMilliseconds := durationMilliseconds(subqOffset)
		start = start - offsetMilliseconds - durationMilliseconds(subqRange)
		end -= offsetMilliseconds
	}

	if evalRange == 0 {
		start -= durationMilliseconds(s.LookbackDelta)
	} else {
		// For all matrix queries we want to ensure that we have (end-start) + range selected
		// this way we have `range` data before the start time
		start -= durationMilliseconds(evalRange)
	}

	offsetMilliseconds := durationMilliseconds(n.OriginalOffset)
	start -= offsetMilliseconds
	end -= offsetMilliseconds

	return start, end
}

func (ng *Engine) getLastSubqueryInterval(path []parser.Node) time.Duration {
	var interval time.Duration
	for _, node := range path {
		if n, ok := node.(*parser.SubqueryExpr); ok {
			interval = n.Step
			if n.Step == 0 {
				interval = time.Duration(ng.noStepSubqueryIntervalFn(durationMilliseconds(n.Range))) * time.Millisecond
			}
		}
	}
	return interval
}

func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) {
	// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
	// The evaluation of the VectorSelector inside then evaluates the given range and unsets
	// the variable.
	var evalRange time.Duration

	parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
		switch n := node.(type) {
		case *parser.VectorSelector:
			start, end := getTimeRangesForSelector(s, n, path, evalRange)
			interval := ng.getLastSubqueryInterval(path)
			if interval == 0 {
				interval = s.Interval
			}
			hints := &storage.SelectHints{
				Start: start,
				End:   end,
				Step:  durationMilliseconds(interval),
				Range: durationMilliseconds(evalRange),
				Func:  extractFuncFromPath(path),
			}
			evalRange = 0
			hints.By, hints.Grouping = extractGroupsFromPath(path)
			n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...)

		case *parser.MatrixSelector:
			evalRange = n.Range
		}
		return nil
	})
}

// extractFuncFromPath walks up the path and searches for the first instance of
// a function or aggregation.
func extractFuncFromPath(p []parser.Node) string {
	if len(p) == 0 {
		return ""
	}
	switch n := p[len(p)-1].(type) {
	case *parser.AggregateExpr:
		return n.Op.String()
	case *parser.Call:
		return n.Func.Name
	case *parser.BinaryExpr:
		// If we hit a binary expression we terminate since we only care about functions
		// or aggregations over a single metric.
		return ""
	}
	return extractFuncFromPath(p[:len(p)-1])
}

// extractGroupsFromPath parses vector outer function and extracts grouping information if by or without was used.
func extractGroupsFromPath(p []parser.Node) (bool, []string) {
	if len(p) == 0 {
		return false, nil
	}
	if n, ok := p[len(p)-1].(*parser.AggregateExpr); ok {
		return !n.Without, n.Grouping
	}
	return false, nil
}

func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations.Annotations, error) {
	switch e := expr.(type) {
	case *parser.MatrixSelector:
		return checkAndExpandSeriesSet(ctx, e.VectorSelector)
	case *parser.VectorSelector:
		if e.Series != nil {
			return nil, nil
		}
		series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
		e.Series = series
		return ws, err
	}
	return nil, nil
}

func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, ws annotations.Annotations, err error) {
	for it.Next() {
		select {
		case <-ctx.Done():
			return nil, nil, ctx.Err()
		default:
		}
		res = append(res, it.At())
	}
	return res, it.Warnings(), it.Err()
}

type errWithWarnings struct {
	err      error
	warnings annotations.Annotations
}

func (e errWithWarnings) Error() string { return e.err.Error() }

// An evaluator evaluates the given expressions over the given fixed
// timestamps. It is attached to an engine through which it connects to a
// querier and reports errors. On timeout or cancellation of its context it
// terminates.
type evaluator struct {
	ctx context.Context

	startTimestamp int64 // Start time in milliseconds.
	endTimestamp   int64 // End time in milliseconds.
	interval       int64 // Interval in milliseconds.

	maxSamples               int
	currentSamples           int
	logger                   log.Logger
	lookbackDelta            time.Duration
	samplesStats             *stats.QuerySamples
	noStepSubqueryIntervalFn func(rangeMillis int64) int64
}

// errorf causes a panic with the input formatted into an error.
func (ev *evaluator) errorf(format string, args ...interface{}) {
	ev.error(fmt.Errorf(format, args...))
}

// error causes a panic with the given error.
func (ev *evaluator) error(err error) {
	panic(err)
}

// recover is the handler that turns panics into returns from the top level of evaluation.
func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp *error) {
	e := recover()
	if e == nil {
		return
	}

	switch err := e.(type) {
	case runtime.Error:
		// Print the stack trace but do not inhibit the running application.
		buf := make([]byte, 64<<10)
		buf = buf[:runtime.Stack(buf, false)]

		level.Error(ev.logger).Log("msg", "runtime panic in parser", "expr", expr.String(), "err", e, "stacktrace", string(buf))
		*errp = fmt.Errorf("unexpected error: %w", err)
	case errWithWarnings:
		*errp = err.err
		ws.Merge(err.warnings)
	case error:
		*errp = err
	default:
		*errp = fmt.Errorf("%v", err)
	}
}

func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) {
	defer ev.recover(expr, &ws, &err)

	v, ws = ev.eval(expr)
	return v, ws, nil
}

// EvalSeriesHelper stores extra information about a series.
type EvalSeriesHelper struct {
	// The grouping key used by aggregation.
	groupingKey uint64
	// Used to map left-hand to right-hand in binary operations.
	signature string
}

// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
type EvalNodeHelper struct {
	// Evaluation timestamp.
	Ts int64
	// Vector that can be used for output.
	Out Vector

	// Caches.
	// DropMetricName and label_*.
	Dmn map[uint64]labels.Labels
	// funcHistogramQuantile for classic histograms.
	signatureToMetricWithBuckets map[string]*metricWithBuckets
	// label_replace.
	regex *regexp.Regexp

	lb           *labels.Builder
	lblBuf       []byte
	lblResultBuf []byte

	// For binary vector matching.
	rightSigs    map[string]Sample
	matchedSigs  map[string]map[uint64]struct{}
	resultMetric map[string]labels.Labels
}

func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) {
	if enh.lb == nil {
		enh.lb = labels.NewBuilder(lbls)
	} else {
		enh.lb.Reset(lbls)
	}
}

// DropMetricName is a cached version of DropMetricName.
func (enh *EvalNodeHelper) DropMetricName(l labels.Labels) labels.Labels {
	if enh.Dmn == nil {
		enh.Dmn = make(map[uint64]labels.Labels, len(enh.Out))
	}
	h := l.Hash()
	ret, ok := enh.Dmn[h]
	if ok {
		return ret
	}
	ret = dropMetricName(l)
	enh.Dmn[h] = ret
	return ret
}

// rangeEval evaluates the given expressions, and then for each step calls
// the given funcCall with the values computed for each expression at that
// step. The return value is the combination into time series of all the
// function call results.
// The prepSeries function (if provided) can be used to prepare the helper
// for each series, then passed to each call funcCall.
func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) {
	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
	matrixes := make([]Matrix, len(exprs))
	origMatrixes := make([]Matrix, len(exprs))
	originalNumSamples := ev.currentSamples

	var warnings annotations.Annotations
	for i, e := range exprs {
		// Functions will take string arguments from the expressions, not the values.
		if e != nil && e.Type() != parser.ValueTypeString {
			// ev.currentSamples will be updated to the correct value within the ev.eval call.
			val, ws := ev.eval(e)
			warnings.Merge(ws)
			matrixes[i] = val.(Matrix)

			// Keep a copy of the original point slices so that they
			// can be returned to the pool.
			origMatrixes[i] = make(Matrix, len(matrixes[i]))
			copy(origMatrixes[i], matrixes[i])
		}
	}

	vectors := make([]Vector, len(exprs))    // Input vectors for the function.
	args := make([]parser.Value, len(exprs)) // Argument to function.
	// Create an output vector that is as big as the input matrix with
	// the most time series.
	biggestLen := 1
	for i := range exprs {
		vectors[i] = make(Vector, 0, len(matrixes[i]))
		if len(matrixes[i]) > biggestLen {
			biggestLen = len(matrixes[i])
		}
	}
	enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
	type seriesAndTimestamp struct {
		Series
		ts int64
	}
	seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
	tempNumSamples := ev.currentSamples

	var (
		seriesHelpers [][]EvalSeriesHelper
		bufHelpers    [][]EvalSeriesHelper // Buffer updated on each step
	)

	// If the series preparation function is provided, we should run it for
	// every single series in the matrix.
	if prepSeries != nil {
		seriesHelpers = make([][]EvalSeriesHelper, len(exprs))
		bufHelpers = make([][]EvalSeriesHelper, len(exprs))

		for i := range exprs {
			seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
			bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))

			for si, series := range matrixes[i] {
				prepSeries(series.Metric, &seriesHelpers[i][si])
			}
		}
	}

	for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
		if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
			ev.error(err)
		}
		// Reset number of samples in memory after each timestamp.
		ev.currentSamples = tempNumSamples
		// Gather input vectors for this timestamp.
		for i := range exprs {
			vectors[i] = vectors[i][:0]

			if prepSeries != nil {
				bufHelpers[i] = bufHelpers[i][:0]
			}

			for si, series := range matrixes[i] {
				switch {
				case len(series.Floats) > 0 && series.Floats[0].T == ts:
					vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
					// Move input vectors forward so we don't have to re-scan the same
					// past points at the next step.
					matrixes[i][si].Floats = series.Floats[1:]
				case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
					vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
					matrixes[i][si].Histograms = series.Histograms[1:]
				default:
					continue
				}
				if prepSeries != nil {
					bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
				}
				ev.currentSamples++
				if ev.currentSamples > ev.maxSamples {
					ev.error(ErrTooManySamples(env))
				}
			}
			args[i] = vectors[i]
			ev.samplesStats.UpdatePeak(ev.currentSamples)
		}

		// Make the function call.
		enh.Ts = ts
		result, ws := funcCall(args, bufHelpers, enh)
		enh.Out = result[:0] // Reuse result vector.
		warnings.Merge(ws)

		vecNumSamples := result.TotalSamples()
		ev.currentSamples += vecNumSamples
		// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
		// needs to include the samples from the result here, as they're still in memory.
		tempNumSamples += vecNumSamples
		ev.samplesStats.UpdatePeak(ev.currentSamples)

		if ev.currentSamples > ev.maxSamples {
			ev.error(ErrTooManySamples(env))
		}
		ev.samplesStats.UpdatePeak(ev.currentSamples)

		// If this could be an instant query, shortcut so as not to change sort order.
		if ev.endTimestamp == ev.startTimestamp {
			if result.ContainsSameLabelset() {
				ev.errorf("vector cannot contain metrics with the same labelset")
			}
			mat := make(Matrix, len(result))
			for i, s := range result {
				if s.H == nil {
					mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}}
				} else {
					mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}}
				}
			}
			ev.currentSamples = originalNumSamples + mat.TotalSamples()
			ev.samplesStats.UpdatePeak(ev.currentSamples)
			return mat, warnings
		}

		// Add samples in output vector to output series.
		for _, sample := range result {
			h := sample.Metric.Hash()
			ss, ok := seriess[h]
			if ok {
				if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
					ev.errorf("vector cannot contain metrics with the same labelset")
				}
				ss.ts = ts
			} else {
				ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
			}
			if sample.H == nil {
				if ss.Floats == nil {
					ss.Floats = getFPointSlice(numSteps)
				}
				ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
			} else {
				if ss.Histograms == nil {
					ss.Histograms = getHPointSlice(numSteps)
				}
				ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
			}
			seriess[h] = ss
		}
	}

	// Reuse the original point slices.
	for _, m := range origMatrixes {
		for _, s := range m {
			putFPointSlice(s.Floats)
			putHPointSlice(s.Histograms)
		}
	}
	// Assemble the output matrix. By the time we get here we know we don't have too many samples.
	mat := make(Matrix, 0, len(seriess))
	for _, ss := range seriess {
		mat = append(mat, ss.Series)
	}
	ev.currentSamples = originalNumSamples + mat.TotalSamples()
	ev.samplesStats.UpdatePeak(ev.currentSamples)
	return mat, warnings
}

// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
	samplesStats := ev.samplesStats
	// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
	ev.samplesStats = ev.samplesStats.NewChild()
	val, ws := ev.eval(subq)
	// But do incorporate the peak from the subquery
	samplesStats.UpdatePeakFromSubquery(ev.samplesStats)
	ev.samplesStats = samplesStats
	mat := val.(Matrix)
	vs := &parser.VectorSelector{
		OriginalOffset: subq.OriginalOffset,
		Offset:         subq.Offset,
		Series:         make([]storage.Series, 0, len(mat)),
		Timestamp:      subq.Timestamp,
	}
	if subq.Timestamp != nil {
		// The offset of subquery is not modified in case of @ modifier.
		// Hence we take care of that here for the result.
		vs.Offset = subq.OriginalOffset + time.Duration(ev.startTimestamp-*subq.Timestamp)*time.Millisecond
	}
	ms := &parser.MatrixSelector{
		Range:          subq.Range,
		VectorSelector: vs,
	}
	for _, s := range mat {
		vs.Series = append(vs.Series, NewStorageSeries(s))
	}
	return ms, mat.TotalSamples(), ws
}

// eval evaluates the given expression as the given AST expression node requires.
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotations) {
	// This is the top-level evaluation method.
	// Thus, we check for timeout/cancellation here.
	if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
		ev.error(err)
	}
	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1

	// Create a new span to help investigate inner evaluation performances.
	ctxWithSpan, span := otel.Tracer("").Start(ev.ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String())
	ev.ctx = ctxWithSpan
	defer span.End()

	switch e := expr.(type) {
	case *parser.AggregateExpr:
		// Grouping labels must be sorted (expected both by generateGroupingKey() and aggregation()).
		sortedGrouping := e.Grouping
		slices.Sort(sortedGrouping)

		// Prepare a function to initialise series helpers with the grouping key.
		buf := make([]byte, 0, 1024)
		initSeries := func(series labels.Labels, h *EvalSeriesHelper) {
			h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf)
		}

		unwrapParenExpr(&e.Param)
		param := unwrapStepInvariantExpr(e.Param)
		unwrapParenExpr(&param)
		if s, ok := param.(*parser.StringLiteral); ok {
			return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
				return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh)
			}, e.Expr)
		}

		return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
			var param float64
			if e.Param != nil {
				param = v[0].(Vector)[0].F
			}
			return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh)
		}, e.Param, e.Expr)

	case *parser.Call:
		call := FunctionCalls[e.Func.Name]
		if e.Func.Name == "timestamp" {
			// Matrix evaluation always returns the evaluation time,
			// so this function needs special handling when given
			// a vector selector.
			unwrapParenExpr(&e.Args[0])
			arg := unwrapStepInvariantExpr(e.Args[0])
			unwrapParenExpr(&arg)
			vs, ok := arg.(*parser.VectorSelector)
			if ok {
				return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e)
			}
		}

		// Check if the function has a matrix argument.
		var (
			matrixArgIndex int
			matrixArg      bool
			warnings       annotations.Annotations
		)
		for i := range e.Args {
			unwrapParenExpr(&e.Args[i])
			a := unwrapStepInvariantExpr(e.Args[i])
			unwrapParenExpr(&a)
			if _, ok := a.(*parser.MatrixSelector); ok {
				matrixArgIndex = i
				matrixArg = true
				break
			}
			// parser.SubqueryExpr can be used in place of parser.MatrixSelector.
			if subq, ok := a.(*parser.SubqueryExpr); ok {
				matrixArgIndex = i
				matrixArg = true
				// Replacing parser.SubqueryExpr with parser.MatrixSelector.
				val, totalSamples, ws := ev.evalSubquery(subq)
				e.Args[i] = val
				warnings.Merge(ws)
				defer func() {
					// subquery result takes space in the memory. Get rid of that at the end.
					val.VectorSelector.(*parser.VectorSelector).Series = nil
					ev.currentSamples -= totalSamples
				}()
				break
			}
		}
		if !matrixArg {
			// Does not have a matrix argument.
			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
				vec, annos := call(v, e.Args, enh)
				return vec, warnings.Merge(annos)
			}, e.Args...)
		}

		inArgs := make([]parser.Value, len(e.Args))
		// Evaluate any non-matrix arguments.
		otherArgs := make([]Matrix, len(e.Args))
		otherInArgs := make([]Vector, len(e.Args))
		for i, e := range e.Args {
			if i != matrixArgIndex {
				val, ws := ev.eval(e)
				otherArgs[i] = val.(Matrix)
				otherInArgs[i] = Vector{Sample{}}
				inArgs[i] = otherInArgs[i]
				warnings.Merge(ws)
			}
		}

		unwrapParenExpr(&e.Args[matrixArgIndex])
		arg := unwrapStepInvariantExpr(e.Args[matrixArgIndex])
		unwrapParenExpr(&arg)
		sel := arg.(*parser.MatrixSelector)
		selVS := sel.VectorSelector.(*parser.VectorSelector)

		ws, err := checkAndExpandSeriesSet(ev.ctx, sel)
		warnings.Merge(ws)
		if err != nil {
			ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings})
		}
		mat := make(Matrix, 0, len(selVS.Series)) // Output matrix.
		offset := durationMilliseconds(selVS.Offset)
		selRange := durationMilliseconds(sel.Range)
		stepRange := selRange
		if stepRange > ev.interval {
			stepRange = ev.interval
		}
		// Reuse objects across steps to save memory allocations.
		var floats []FPoint
		var histograms []HPoint
		inMatrix := make(Matrix, 1)
		inArgs[matrixArgIndex] = inMatrix
		enh := &EvalNodeHelper{Out: make(Vector, 0, 1)}
		// Process all the calls for one time series at a time.
		it := storage.NewBuffer(selRange)
		var chkIter chunkenc.Iterator
		for i, s := range selVS.Series {
			ev.currentSamples -= len(floats) + totalHPointSize(histograms)
			if floats != nil {
				floats = floats[:0]
			}
			if histograms != nil {
				histograms = histograms[:0]
			}
			chkIter = s.Iterator(chkIter)
			it.Reset(chkIter)
			metric := selVS.Series[i].Labels()
			// The last_over_time function acts like offset; thus, it
			// should keep the metric name.  For all the other range
			// vector functions, the only change needed is to drop the
			// metric name in the output.
			if e.Func.Name != "last_over_time" {
				metric = dropMetricName(metric)
			}
			ss := Series{
				Metric: metric,
			}
			inMatrix[0].Metric = selVS.Series[i].Labels()
			for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
				step++
				// Set the non-matrix arguments.
				// They are scalar, so it is safe to use the step number
				// when looking up the argument, as there will be no gaps.
				for j := range e.Args {
					if j != matrixArgIndex {
						otherInArgs[j][0].F = otherArgs[j][0].Floats[step].F
					}
				}
				maxt := ts - offset
				mint := maxt - selRange
				// Evaluate the matrix selector for this series for this step.
				floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms)
				if len(floats)+len(histograms) == 0 {
					continue
				}
				inMatrix[0].Floats = floats
				inMatrix[0].Histograms = histograms
				enh.Ts = ts
				// Make the function call.
				outVec, annos := call(inArgs, e.Args, enh)
				warnings.Merge(annos)
				ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms)))

				enh.Out = outVec[:0]
				if len(outVec) > 0 {
					if outVec[0].H == nil {
						if ss.Floats == nil {
							ss.Floats = getFPointSlice(numSteps)
						}
						ss.Floats = append(ss.Floats, FPoint{F: outVec[0].F, T: ts})
					} else {
						if ss.Histograms == nil {
							ss.Histograms = getHPointSlice(numSteps)
						}
						ss.Histograms = append(ss.Histograms, HPoint{H: outVec[0].H, T: ts})
					}
				}
				// Only buffer stepRange milliseconds from the second step on.
				it.ReduceDelta(stepRange)
			}
			histSamples := totalHPointSize(ss.Histograms)
			if len(ss.Floats)+histSamples > 0 {
				if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples {
					mat = append(mat, ss)
					ev.currentSamples += len(ss.Floats) + histSamples
				} else {
					ev.error(ErrTooManySamples(env))
				}
			}
			ev.samplesStats.UpdatePeak(ev.currentSamples)
		}
		ev.samplesStats.UpdatePeak(ev.currentSamples)

		ev.currentSamples -= len(floats) + totalHPointSize(histograms)
		putFPointSlice(floats)
		putHPointSlice(histograms)

		// The absent_over_time function returns 0 or 1 series. So far, the matrix
		// contains multiple series. The following code will create a new series
		// with values of 1 for the timestamps where no series has value.
		if e.Func.Name == "absent_over_time" {
			steps := int(1 + (ev.endTimestamp-ev.startTimestamp)/ev.interval)
			// Iterate once to look for a complete series.
			for _, s := range mat {
				if len(s.Floats)+len(s.Histograms) == steps {
					return Matrix{}, warnings
				}
			}

			found := map[int64]struct{}{}

			for i, s := range mat {
				for _, p := range s.Floats {
					found[p.T] = struct{}{}
				}
				for _, p := range s.Histograms {
					found[p.T] = struct{}{}
				}
				if i > 0 && len(found) == steps {
					return Matrix{}, warnings
				}
			}

			newp := make([]FPoint, 0, steps-len(found))
			for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
				if _, ok := found[ts]; !ok {
					newp = append(newp, FPoint{T: ts, F: 1})
				}
			}

			return Matrix{
				Series{
					Metric: createLabelsForAbsentFunction(e.Args[0]),
					Floats: newp,
				},
			}, warnings
		}

		if mat.ContainsSameLabelset() {
			ev.errorf("vector cannot contain metrics with the same labelset")
		}

		return mat, warnings

	case *parser.ParenExpr:
		return ev.eval(e.Expr)

	case *parser.UnaryExpr:
		val, ws := ev.eval(e.Expr)
		mat := val.(Matrix)
		if e.Op == parser.SUB {
			for i := range mat {
				mat[i].Metric = dropMetricName(mat[i].Metric)
				for j := range mat[i].Floats {
					mat[i].Floats[j].F = -mat[i].Floats[j].F
				}
			}
			if mat.ContainsSameLabelset() {
				ev.errorf("vector cannot contain metrics with the same labelset")
			}
		}
		return mat, ws

	case *parser.BinaryExpr:
		switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
		case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
				val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F)
				return append(enh.Out, Sample{F: val}), nil
			}, e.LHS, e.RHS)
		case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector:
			// Function to compute the join signature for each series.
			buf := make([]byte, 0, 1024)
			sigf := signatureFunc(e.VectorMatching.On, buf, e.VectorMatching.MatchingLabels...)
			initSignatures := func(series labels.Labels, h *EvalSeriesHelper) {
				h.signature = sigf(series)
			}
			switch e.Op {
			case parser.LAND:
				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
					return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
				}, e.LHS, e.RHS)
			case parser.LOR:
				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
					return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
				}, e.LHS, e.RHS)
			case parser.LUNLESS:
				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
					return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
				}, e.LHS, e.RHS)
			default:
				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
					return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh), nil
				}, e.LHS, e.RHS)
			}

		case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar:
			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
				return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh), nil
			}, e.LHS, e.RHS)

		case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector:
			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
				return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh), nil
			}, e.LHS, e.RHS)
		}

	case *parser.NumberLiteral:
		return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
			return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil
		})

	case *parser.StringLiteral:
		return String{V: e.Val, T: ev.startTimestamp}, nil

	case *parser.VectorSelector:
		ws, err := checkAndExpandSeriesSet(ev.ctx, e)
		if err != nil {
			ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
		}
		mat := make(Matrix, 0, len(e.Series))
		it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
		var chkIter chunkenc.Iterator
		for i, s := range e.Series {
			chkIter = s.Iterator(chkIter)
			it.Reset(chkIter)
			ss := Series{
				Metric: e.Series[i].Labels(),
			}

			for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
				step++
				_, f, h, ok := ev.vectorSelectorSingle(it, e, ts)
				if ok {
					if ev.currentSamples < ev.maxSamples {
						if h == nil {
							if ss.Floats == nil {
								ss.Floats = getFPointSlice(numSteps)
							}
							ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
							ev.currentSamples++
							ev.samplesStats.IncrementSamplesAtStep(step, 1)
						} else {
							if ss.Histograms == nil {
								ss.Histograms = getHPointSlice(numSteps)
							}
							point := HPoint{H: h, T: ts}
							ss.Histograms = append(ss.Histograms, point)
							histSize := point.size()
							ev.currentSamples += histSize
							ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize))
						}
					} else {
						ev.error(ErrTooManySamples(env))
					}
				}
			}

			if len(ss.Floats)+len(ss.Histograms) > 0 {
				mat = append(mat, ss)
			}
		}
		ev.samplesStats.UpdatePeak(ev.currentSamples)
		return mat, ws

	case *parser.MatrixSelector:
		if ev.startTimestamp != ev.endTimestamp {
			panic(errors.New("cannot do range evaluation of matrix selector"))
		}
		return ev.matrixSelector(e)

	case *parser.SubqueryExpr:
		offsetMillis := durationMilliseconds(e.Offset)
		rangeMillis := durationMilliseconds(e.Range)
		newEv := &evaluator{
			endTimestamp:             ev.endTimestamp - offsetMillis,
			ctx:                      ev.ctx,
			currentSamples:           ev.currentSamples,
			maxSamples:               ev.maxSamples,
			logger:                   ev.logger,
			lookbackDelta:            ev.lookbackDelta,
			samplesStats:             ev.samplesStats.NewChild(),
			noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
		}

		if e.Step != 0 {
			newEv.interval = durationMilliseconds(e.Step)
		} else {
			newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis)
		}

		// Start with the first timestamp after (ev.startTimestamp - offset - range)
		// that is aligned with the step (multiple of 'newEv.interval').
		newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval)
		if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) {
			newEv.startTimestamp += newEv.interval
		}

		if newEv.startTimestamp != ev.startTimestamp {
			// Adjust the offset of selectors based on the new
			// start time of the evaluator since the calculation
			// of the offset with @ happens w.r.t. the start time.
			setOffsetForAtModifier(newEv.startTimestamp, e.Expr)
		}

		res, ws := newEv.eval(e.Expr)
		ev.currentSamples = newEv.currentSamples
		ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
		ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples)
		return res, ws
	case *parser.StepInvariantExpr:
		switch ce := e.Expr.(type) {
		case *parser.StringLiteral, *parser.NumberLiteral:
			return ev.eval(ce)
		}

		newEv := &evaluator{
			startTimestamp:           ev.startTimestamp,
			endTimestamp:             ev.startTimestamp, // Always a single evaluation.
			interval:                 ev.interval,
			ctx:                      ev.ctx,
			currentSamples:           ev.currentSamples,
			maxSamples:               ev.maxSamples,
			logger:                   ev.logger,
			lookbackDelta:            ev.lookbackDelta,
			samplesStats:             ev.samplesStats.NewChild(),
			noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
		}
		res, ws := newEv.eval(e.Expr)
		ev.currentSamples = newEv.currentSamples
		ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
		for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
			step++
			ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples)
		}
		switch e.Expr.(type) {
		case *parser.MatrixSelector, *parser.SubqueryExpr:
			// We do not duplicate results for range selectors since result is a matrix
			// with their unique timestamps which does not depend on the step.
			return res, ws
		}

		// For every evaluation while the value remains same, the timestamp for that
		// value would change for different eval times. Hence we duplicate the result
		// with changed timestamps.
		mat, ok := res.(Matrix)
		if !ok {
			panic(fmt.Errorf("unexpected result in StepInvariantExpr evaluation: %T", expr))
		}
		for i := range mat {
			if len(mat[i].Floats)+len(mat[i].Histograms) != 1 {
				panic(fmt.Errorf("unexpected number of samples"))
			}
			for ts := ev.startTimestamp + ev.interval; ts <= ev.endTimestamp; ts += ev.interval {
				if len(mat[i].Floats) > 0 {
					mat[i].Floats = append(mat[i].Floats, FPoint{
						T: ts,
						F: mat[i].Floats[0].F,
					})
					ev.currentSamples++
				} else {
					point := HPoint{
						T: ts,
						H: mat[i].Histograms[0].H,
					}
					mat[i].Histograms = append(mat[i].Histograms, point)
					ev.currentSamples += point.size()
				}
				if ev.currentSamples > ev.maxSamples {
					ev.error(ErrTooManySamples(env))
				}
			}
		}
		ev.samplesStats.UpdatePeak(ev.currentSamples)
		return res, ws
	}

	panic(fmt.Errorf("unhandled expression of type: %T", expr))
}

func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) {
	ws, err := checkAndExpandSeriesSet(ev.ctx, vs)
	if err != nil {
		ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
	}

	seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series))
	for i, s := range vs.Series {
		it := s.Iterator(nil)
		seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta))
	}

	return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
		if vs.Timestamp != nil {
			// This is a special case for "timestamp()" when the @ modifier is used, to ensure that
			// we return a point for each time step in this case.
			// See https://github.com/prometheus/prometheus/issues/8433.
			vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond
		}

		vec := make(Vector, 0, len(vs.Series))
		for i, s := range vs.Series {
			it := seriesIterators[i]
			t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts)
			if ok {
				vec = append(vec, Sample{
					Metric: s.Labels(),
					T:      t,
					F:      f,
					H:      h,
				})
				histSize := 0
				if h != nil {
					histSize := h.Size() / 16 // 16 bytes per sample.
					ev.currentSamples += histSize
				}
				ev.currentSamples++

				ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(1+histSize))
				if ev.currentSamples > ev.maxSamples {
					ev.error(ErrTooManySamples(env))
				}
			}
		}
		ev.samplesStats.UpdatePeak(ev.currentSamples)
		vec, annos := call([]parser.Value{vec}, e.Args, enh)
		return vec, ws.Merge(annos)
	})
}

// vectorSelectorSingle evaluates an instant vector for the iterator of one time series.
func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (
	int64, float64, *histogram.FloatHistogram, bool,
) {
	refTime := ts - durationMilliseconds(node.Offset)
	var t int64
	var v float64
	var h *histogram.FloatHistogram

	valueType := it.Seek(refTime)
	switch valueType {
	case chunkenc.ValNone:
		if it.Err() != nil {
			ev.error(it.Err())
		}
	case chunkenc.ValFloat:
		t, v = it.At()
	case chunkenc.ValFloatHistogram:
		t, h = it.AtFloatHistogram()
	default:
		panic(fmt.Errorf("unknown value type %v", valueType))
	}
	if valueType == chunkenc.ValNone || t > refTime {
		var ok bool
		t, v, h, ok = it.PeekPrev()
		if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) {
			return 0, 0, nil, false
		}
	}
	if value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) {
		return 0, 0, nil, false
	}
	return t, v, h, true
}

var (
	fPointPool zeropool.Pool[[]FPoint]
	hPointPool zeropool.Pool[[]HPoint]
)

func getFPointSlice(sz int) []FPoint {
	if p := fPointPool.Get(); p != nil {
		return p
	}

	if sz > maxPointsSliceSize {
		sz = maxPointsSliceSize
	}

	return make([]FPoint, 0, sz)
}

// putFPointSlice will return a FPoint slice of size max(maxPointsSliceSize, sz).
// This function is called with an estimated size which often can be over-estimated.
func putFPointSlice(p []FPoint) {
	if p != nil {
		fPointPool.Put(p[:0])
	}
}

// getHPointSlice will return a HPoint slice of size max(maxPointsSliceSize, sz).
// This function is called with an estimated size which often can be over-estimated.
func getHPointSlice(sz int) []HPoint {
	if p := hPointPool.Get(); p != nil {
		return p
	}

	if sz > maxPointsSliceSize {
		sz = maxPointsSliceSize
	}

	return make([]HPoint, 0, sz)
}

func putHPointSlice(p []HPoint) {
	if p != nil {
		hPointPool.Put(p[:0])
	}
}

// matrixSelector evaluates a *parser.MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) {
	var (
		vs = node.VectorSelector.(*parser.VectorSelector)

		offset = durationMilliseconds(vs.Offset)
		maxt   = ev.startTimestamp - offset
		mint   = maxt - durationMilliseconds(node.Range)
		matrix = make(Matrix, 0, len(vs.Series))

		it = storage.NewBuffer(durationMilliseconds(node.Range))
	)
	ws, err := checkAndExpandSeriesSet(ev.ctx, node)
	if err != nil {
		ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
	}

	var chkIter chunkenc.Iterator
	series := vs.Series
	for i, s := range series {
		if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
			ev.error(err)
		}
		chkIter = s.Iterator(chkIter)
		it.Reset(chkIter)
		ss := Series{
			Metric: series[i].Labels(),
		}

		ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil)
		totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms))
		ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize)

		if totalSize > 0 {
			matrix = append(matrix, ss)
		} else {
			putFPointSlice(ss.Floats)
			putHPointSlice(ss.Histograms)
		}
	}
	return matrix, ws
}

// matrixIterSlice populates a matrix vector covering the requested range for a
// single time series, with points retrieved from an iterator.
//
// As an optimization, the matrix vector may already contain points of the same
// time series from the evaluation of an earlier step (with lower mint and maxt
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func (ev *evaluator) matrixIterSlice(
	it *storage.BufferedSeriesIterator, mint, maxt int64,
	floats []FPoint, histograms []HPoint,
) ([]FPoint, []HPoint) {
	mintFloats, mintHistograms := mint, mint

	// First floats...
	if len(floats) > 0 && floats[len(floats)-1].T >= mint {
		// There is an overlap between previous and current ranges, retain common
		// points. In most such cases:
		//   (a) the overlap is significantly larger than the eval step; and/or
		//   (b) the number of samples is relatively small.
		// so a linear search will be as fast as a binary search.
		var drop int
		for drop = 0; floats[drop].T < mint; drop++ {
		}
		ev.currentSamples -= drop
		copy(floats, floats[drop:])
		floats = floats[:len(floats)-drop]
		// Only append points with timestamps after the last timestamp we have.
		mintFloats = floats[len(floats)-1].T + 1
	} else {
		ev.currentSamples -= len(floats)
		if floats != nil {
			floats = floats[:0]
		}
	}

	// ...then the same for histograms. TODO(beorn7): Use generics?
	if len(histograms) > 0 && histograms[len(histograms)-1].T >= mint {
		// There is an overlap between previous and current ranges, retain common
		// points. In most such cases:
		//   (a) the overlap is significantly larger than the eval step; and/or
		//   (b) the number of samples is relatively small.
		// so a linear search will be as fast as a binary search.
		var drop int
		for drop = 0; histograms[drop].T < mint; drop++ {
		}
		copy(histograms, histograms[drop:])
		histograms = histograms[:len(histograms)-drop]
		ev.currentSamples -= totalHPointSize(histograms)
		// Only append points with timestamps after the last timestamp we have.
		mintHistograms = histograms[len(histograms)-1].T + 1
	} else {
		ev.currentSamples -= totalHPointSize(histograms)
		if histograms != nil {
			histograms = histograms[:0]
		}
	}

	soughtValueType := it.Seek(maxt)
	if soughtValueType == chunkenc.ValNone {
		if it.Err() != nil {
			ev.error(it.Err())
		}
	}

	buf := it.Buffer()
loop:
	for {
		switch buf.Next() {
		case chunkenc.ValNone:
			break loop
		case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
			t := buf.AtT()
			// Values in the buffer are guaranteed to be smaller than maxt.
			if t >= mintHistograms {
				if histograms == nil {
					histograms = getHPointSlice(16)
				}
				n := len(histograms)
				if n < cap(histograms) {
					histograms = histograms[:n+1]
				} else {
					histograms = append(histograms, HPoint{})
				}
				histograms[n].T, histograms[n].H = buf.AtFloatHistogram(histograms[n].H)
				if value.IsStaleNaN(histograms[n].H.Sum) {
					histograms = histograms[:n]
					continue loop
				}
				if ev.currentSamples >= ev.maxSamples {
					ev.error(ErrTooManySamples(env))
				}
				ev.currentSamples += histograms[n].size()
			}
		case chunkenc.ValFloat:
			t, f := buf.At()
			if value.IsStaleNaN(f) {
				continue loop
			}
			// Values in the buffer are guaranteed to be smaller than maxt.
			if t >= mintFloats {
				if ev.currentSamples >= ev.maxSamples {
					ev.error(ErrTooManySamples(env))
				}
				ev.currentSamples++
				if floats == nil {
					floats = getFPointSlice(16)
				}
				floats = append(floats, FPoint{T: t, F: f})
			}
		}
	}
	// The sought sample might also be in the range.
	switch soughtValueType {
	case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
		t, h := it.AtFloatHistogram()
		if t == maxt && !value.IsStaleNaN(h.Sum) {
			if ev.currentSamples >= ev.maxSamples {
				ev.error(ErrTooManySamples(env))
			}
			if histograms == nil {
				histograms = getHPointSlice(16)
			}
			point := HPoint{T: t, H: h}
			histograms = append(histograms, point)
			ev.currentSamples += point.size()
		}
	case chunkenc.ValFloat:
		t, f := it.At()
		if t == maxt && !value.IsStaleNaN(f) {
			if ev.currentSamples >= ev.maxSamples {
				ev.error(ErrTooManySamples(env))
			}
			if floats == nil {
				floats = getFPointSlice(16)
			}
			floats = append(floats, FPoint{T: t, F: f})
			ev.currentSamples++
		}
	}
	ev.samplesStats.UpdatePeak(ev.currentSamples)
	return floats, histograms
}

func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
	if matching.Card != parser.CardManyToMany {
		panic("set operations must only use many-to-many matching")
	}
	if len(lhs) == 0 || len(rhs) == 0 {
		return nil // Short-circuit: AND with nothing is nothing.
	}

	// The set of signatures for the right-hand side Vector.
	rightSigs := map[string]struct{}{}
	// Add all rhs samples to a map so we can easily find matches later.
	for _, sh := range rhsh {
		rightSigs[sh.signature] = struct{}{}
	}

	for i, ls := range lhs {
		// If there's a matching entry in the right-hand side Vector, add the sample.
		if _, ok := rightSigs[lhsh[i].signature]; ok {
			enh.Out = append(enh.Out, ls)
		}
	}
	return enh.Out
}

func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
	switch {
	case matching.Card != parser.CardManyToMany:
		panic("set operations must only use many-to-many matching")
	case len(lhs) == 0: // Short-circuit.
		enh.Out = append(enh.Out, rhs...)
		return enh.Out
	case len(rhs) == 0:
		enh.Out = append(enh.Out, lhs...)
		return enh.Out
	}

	leftSigs := map[string]struct{}{}
	// Add everything from the left-hand-side Vector.
	for i, ls := range lhs {
		leftSigs[lhsh[i].signature] = struct{}{}
		enh.Out = append(enh.Out, ls)
	}
	// Add all right-hand side elements which have not been added from the left-hand side.
	for j, rs := range rhs {
		if _, ok := leftSigs[rhsh[j].signature]; !ok {
			enh.Out = append(enh.Out, rs)
		}
	}
	return enh.Out
}

func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
	if matching.Card != parser.CardManyToMany {
		panic("set operations must only use many-to-many matching")
	}
	// Short-circuit: empty rhs means we will return everything in lhs;
	// empty lhs means we will return empty - don't need to build a map.
	if len(lhs) == 0 || len(rhs) == 0 {
		enh.Out = append(enh.Out, lhs...)
		return enh.Out
	}

	rightSigs := map[string]struct{}{}
	for _, sh := range rhsh {
		rightSigs[sh.signature] = struct{}{}
	}

	for i, ls := range lhs {
		if _, ok := rightSigs[lhsh[i].signature]; !ok {
			enh.Out = append(enh.Out, ls)
		}
	}
	return enh.Out
}

// VectorBinop evaluates a binary operation between two Vectors, excluding set operators.
func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *parser.VectorMatching, returnBool bool, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
	if matching.Card == parser.CardManyToMany {
		panic("many-to-many only allowed for set operators")
	}
	if len(lhs) == 0 || len(rhs) == 0 {
		return nil // Short-circuit: nothing is going to match.
	}

	// The control flow below handles one-to-one or many-to-one matching.
	// For one-to-many, swap sidedness and account for the swap when calculating
	// values.
	if matching.Card == parser.CardOneToMany {
		lhs, rhs = rhs, lhs
		lhsh, rhsh = rhsh, lhsh
	}

	// All samples from the rhs hashed by the matching label/values.
	if enh.rightSigs == nil {
		enh.rightSigs = make(map[string]Sample, len(enh.Out))
	} else {
		for k := range enh.rightSigs {
			delete(enh.rightSigs, k)
		}
	}
	rightSigs := enh.rightSigs

	// Add all rhs samples to a map so we can easily find matches later.
	for i, rs := range rhs {
		sig := rhsh[i].signature
		// The rhs is guaranteed to be the 'one' side. Having multiple samples
		// with the same signature means that the matching is many-to-many.
		if duplSample, found := rightSigs[sig]; found {
			// oneSide represents which side of the vector represents the 'one' in the many-to-one relationship.
			oneSide := "right"
			if matching.Card == parser.CardOneToMany {
				oneSide = "left"
			}
			matchedLabels := rs.Metric.MatchLabels(matching.On, matching.MatchingLabels...)
			// Many-to-many matching not allowed.
			ev.errorf("found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]"+
				";many-to-many matching not allowed: matching labels must be unique on one side", matchedLabels.String(), oneSide, rs.Metric.String(), duplSample.Metric.String())
		}
		rightSigs[sig] = rs
	}

	// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one
	// the value is a set of signatures to detect duplicated result elements.
	if enh.matchedSigs == nil {
		enh.matchedSigs = make(map[string]map[uint64]struct{}, len(rightSigs))
	} else {
		for k := range enh.matchedSigs {
			delete(enh.matchedSigs, k)
		}
	}
	matchedSigs := enh.matchedSigs

	// For all lhs samples find a respective rhs sample and perform
	// the binary operation.
	for i, ls := range lhs {
		sig := lhsh[i].signature

		rs, found := rightSigs[sig] // Look for a match in the rhs Vector.
		if !found {
			continue
		}

		// Account for potentially swapped sidedness.
		fl, fr := ls.F, rs.F
		hl, hr := ls.H, rs.H
		if matching.Card == parser.CardOneToMany {
			fl, fr = fr, fl
			hl, hr = hr, hl
		}
		floatValue, histogramValue, keep := vectorElemBinop(op, fl, fr, hl, hr)
		switch {
		case returnBool:
			if keep {
				floatValue = 1.0
			} else {
				floatValue = 0.0
			}
		case !keep:
			continue
		}
		metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh)
		if returnBool {
			metric = enh.DropMetricName(metric)
		}
		insertedSigs, exists := matchedSigs[sig]
		if matching.Card == parser.CardOneToOne {
			if exists {
				ev.errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)")
			}
			matchedSigs[sig] = nil // Set existence to true.
		} else {
			// In many-to-one matching the grouping labels have to ensure a unique metric
			// for the result Vector. Check whether those labels have already been added for
			// the same matching labels.
			insertSig := metric.Hash()

			if !exists {
				insertedSigs = map[uint64]struct{}{}
				matchedSigs[sig] = insertedSigs
			} else if _, duplicate := insertedSigs[insertSig]; duplicate {
				ev.errorf("multiple matches for labels: grouping labels must ensure unique matches")
			}
			insertedSigs[insertSig] = struct{}{}
		}

		enh.Out = append(enh.Out, Sample{
			Metric: metric,
			F:      floatValue,
			H:      histogramValue,
		})
	}
	return enh.Out
}

func signatureFunc(on bool, b []byte, names ...string) func(labels.Labels) string {
	if on {
		slices.Sort(names)
		return func(lset labels.Labels) string {
			return string(lset.BytesWithLabels(b, names...))
		}
	}
	names = append([]string{labels.MetricName}, names...)
	slices.Sort(names)
	return func(lset labels.Labels) string {
		return string(lset.BytesWithoutLabels(b, names...))
	}
}

// resultMetric returns the metric for the given sample(s) based on the Vector
// binary operation and the matching options.
func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.VectorMatching, enh *EvalNodeHelper) labels.Labels {
	if enh.resultMetric == nil {
		enh.resultMetric = make(map[string]labels.Labels, len(enh.Out))
	}

	enh.resetBuilder(lhs)
	buf := bytes.NewBuffer(enh.lblResultBuf[:0])
	enh.lblBuf = lhs.Bytes(enh.lblBuf)
	buf.Write(enh.lblBuf)
	enh.lblBuf = rhs.Bytes(enh.lblBuf)
	buf.Write(enh.lblBuf)
	enh.lblResultBuf = buf.Bytes()

	if ret, ok := enh.resultMetric[string(enh.lblResultBuf)]; ok {
		return ret
	}
	str := string(enh.lblResultBuf)

	if shouldDropMetricName(op) {
		enh.lb.Del(labels.MetricName)
	}

	if matching.Card == parser.CardOneToOne {
		if matching.On {
			enh.lb.Keep(matching.MatchingLabels...)
		} else {
			enh.lb.Del(matching.MatchingLabels...)
		}
	}
	for _, ln := range matching.Include {
		// Included labels from the `group_x` modifier are taken from the "one"-side.
		if v := rhs.Get(ln); v != "" {
			enh.lb.Set(ln, v)
		} else {
			enh.lb.Del(ln)
		}
	}

	ret := enh.lb.Labels()
	enh.resultMetric[str] = ret
	return ret
}

// VectorscalarBinop evaluates a binary operation between a Vector and a Scalar.
func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector {
	for _, lhsSample := range lhs {
		lf, rf := lhsSample.F, rhs.V
		var rh *histogram.FloatHistogram
		lh := lhsSample.H
		// lhs always contains the Vector. If the original position was different
		// swap for calculating the value.
		if swap {
			lf, rf = rf, lf
			lh, rh = rh, lh
		}
		float, histogram, keep := vectorElemBinop(op, lf, rf, lh, rh)
		// Catch cases where the scalar is the LHS in a scalar-vector comparison operation.
		// We want to always keep the vector element value as the output value, even if it's on the RHS.
		if op.IsComparisonOperator() && swap {
			float = rf
			histogram = rh
		}
		if returnBool {
			if keep {
				float = 1.0
			} else {
				float = 0.0
			}
			keep = true
		}
		if keep {
			lhsSample.F = float
			lhsSample.H = histogram
			if shouldDropMetricName(op) || returnBool {
				lhsSample.Metric = enh.DropMetricName(lhsSample.Metric)
			}
			enh.Out = append(enh.Out, lhsSample)
		}
	}
	return enh.Out
}

func dropMetricName(l labels.Labels) labels.Labels {
	return labels.NewBuilder(l).Del(labels.MetricName).Labels()
}

// scalarBinop evaluates a binary operation between two Scalars.
func scalarBinop(op parser.ItemType, lhs, rhs float64) float64 {
	switch op {
	case parser.ADD:
		return lhs + rhs
	case parser.SUB:
		return lhs - rhs
	case parser.MUL:
		return lhs * rhs
	case parser.DIV:
		return lhs / rhs
	case parser.POW:
		return math.Pow(lhs, rhs)
	case parser.MOD:
		return math.Mod(lhs, rhs)
	case parser.EQLC:
		return btos(lhs == rhs)
	case parser.NEQ:
		return btos(lhs != rhs)
	case parser.GTR:
		return btos(lhs > rhs)
	case parser.LSS:
		return btos(lhs < rhs)
	case parser.GTE:
		return btos(lhs >= rhs)
	case parser.LTE:
		return btos(lhs <= rhs)
	case parser.ATAN2:
		return math.Atan2(lhs, rhs)
	}
	panic(fmt.Errorf("operator %q not allowed for Scalar operations", op))
}

// vectorElemBinop evaluates a binary operation between two Vector elements.
func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool) {
	switch op {
	case parser.ADD:
		if hlhs != nil && hrhs != nil {
			// The histogram being added must have the larger schema
			// code (i.e. the higher resolution).
			if hrhs.Schema >= hlhs.Schema {
				return 0, hlhs.Copy().Add(hrhs).Compact(0), true
			}
			return 0, hrhs.Copy().Add(hlhs).Compact(0), true
		}
		return lhs + rhs, nil, true
	case parser.SUB:
		if hlhs != nil && hrhs != nil {
			// The histogram being subtracted must have the larger schema
			// code (i.e. the higher resolution).
			if hrhs.Schema >= hlhs.Schema {
				return 0, hlhs.Copy().Sub(hrhs).Compact(0), true
			}
			return 0, hrhs.Copy().Mul(-1).Add(hlhs).Compact(0), true
		}
		return lhs - rhs, nil, true
	case parser.MUL:
		if hlhs != nil && hrhs == nil {
			return 0, hlhs.Copy().Mul(rhs), true
		}
		if hlhs == nil && hrhs != nil {
			return 0, hrhs.Copy().Mul(lhs), true
		}
		return lhs * rhs, nil, true
	case parser.DIV:
		if hlhs != nil && hrhs == nil {
			return 0, hlhs.Copy().Div(rhs), true
		}
		return lhs / rhs, nil, true
	case parser.POW:
		return math.Pow(lhs, rhs), nil, true
	case parser.MOD:
		return math.Mod(lhs, rhs), nil, true
	case parser.EQLC:
		return lhs, nil, lhs == rhs
	case parser.NEQ:
		return lhs, nil, lhs != rhs
	case parser.GTR:
		return lhs, nil, lhs > rhs
	case parser.LSS:
		return lhs, nil, lhs < rhs
	case parser.GTE:
		return lhs, nil, lhs >= rhs
	case parser.LTE:
		return lhs, nil, lhs <= rhs
	case parser.ATAN2:
		return math.Atan2(lhs, rhs), nil, true
	}
	panic(fmt.Errorf("operator %q not allowed for operations between Vectors", op))
}

type groupedAggregation struct {
	hasFloat       bool // Has at least 1 float64 sample aggregated.
	hasHistogram   bool // Has at least 1 histogram sample aggregated.
	labels         labels.Labels
	floatValue     float64
	histogramValue *histogram.FloatHistogram
	floatMean      float64
	histogramMean  *histogram.FloatHistogram
	groupCount     int
	heap           vectorByValueHeap
	reverseHeap    vectorByReverseValueHeap
}

// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
	op := e.Op
	without := e.Without
	var annos annotations.Annotations
	result := map[uint64]*groupedAggregation{}
	orderedResult := []*groupedAggregation{}
	var k int64
	if op == parser.TOPK || op == parser.BOTTOMK {
		f := param.(float64)
		if !convertibleToInt64(f) {
			ev.errorf("Scalar value %v overflows int64", f)
		}
		k = int64(f)
		if k < 1 {
			return Vector{}, annos
		}
	}
	var q float64
	if op == parser.QUANTILE {
		q = param.(float64)
	}
	var valueLabel string
	var recomputeGroupingKey bool
	if op == parser.COUNT_VALUES {
		valueLabel = param.(string)
		if !model.LabelName(valueLabel).IsValid() {
			ev.errorf("invalid label name %q", valueLabel)
		}
		if !without {
			// We're changing the grouping labels so we have to ensure they're still sorted
			// and we have to flag to recompute the grouping key. Considering the count_values()
			// operator is less frequently used than other aggregations, we're fine having to
			// re-compute the grouping key on each step for this case.
			grouping = append(grouping, valueLabel)
			slices.Sort(grouping)
			recomputeGroupingKey = true
		}
	}

	var buf []byte
	for si, s := range vec {
		metric := s.Metric

		if op == parser.COUNT_VALUES {
			enh.resetBuilder(metric)
			enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
			metric = enh.lb.Labels()

			// We've changed the metric so we have to recompute the grouping key.
			recomputeGroupingKey = true
		}

		// We can use the pre-computed grouping key unless grouping labels have changed.
		var groupingKey uint64
		if !recomputeGroupingKey {
			groupingKey = seriesHelper[si].groupingKey
		} else {
			groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
		}

		group, ok := result[groupingKey]
		// Add a new group if it doesn't exist.
		if !ok {
			var m labels.Labels
			enh.resetBuilder(metric)
			switch {
			case without:
				enh.lb.Del(grouping...)
				enh.lb.Del(labels.MetricName)
				m = enh.lb.Labels()
			case len(grouping) > 0:
				enh.lb.Keep(grouping...)
				m = enh.lb.Labels()
			default:
				m = labels.EmptyLabels()
			}
			newAgg := &groupedAggregation{
				labels:     m,
				floatValue: s.F,
				floatMean:  s.F,
				groupCount: 1,
			}
			switch {
			case s.H == nil:
				newAgg.hasFloat = true
			case op == parser.SUM:
				newAgg.histogramValue = s.H.Copy()
				newAgg.hasHistogram = true
			case op == parser.AVG:
				newAgg.histogramMean = s.H.Copy()
				newAgg.hasHistogram = true
			case op == parser.STDVAR || op == parser.STDDEV:
				newAgg.groupCount = 0
			}

			result[groupingKey] = newAgg
			orderedResult = append(orderedResult, newAgg)

			inputVecLen := int64(len(vec))
			resultSize := k
			switch {
			case k > inputVecLen:
				resultSize = inputVecLen
			case k == 0:
				resultSize = 1
			}
			switch op {
			case parser.STDVAR, parser.STDDEV:
				result[groupingKey].floatValue = 0
			case parser.TOPK, parser.QUANTILE:
				result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
				result[groupingKey].heap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
			case parser.BOTTOMK:
				result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
				result[groupingKey].reverseHeap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
			case parser.GROUP:
				result[groupingKey].floatValue = 1
			}
			continue
		}

		switch op {
		case parser.SUM:
			if s.H != nil {
				group.hasHistogram = true
				if group.histogramValue != nil {
					// The histogram being added must have
					// an equal or larger schema.
					if s.H.Schema >= group.histogramValue.Schema {
						group.histogramValue.Add(s.H)
					} else {
						group.histogramValue = s.H.Copy().Add(group.histogramValue)
					}
				}
				// Otherwise the aggregation contained floats
				// previously and will be invalid anyway. No
				// point in copying the histogram in that case.
			} else {
				group.hasFloat = true
				group.floatValue += s.F
			}

		case parser.AVG:
			group.groupCount++
			if s.H != nil {
				group.hasHistogram = true
				if group.histogramMean != nil {
					left := s.H.Copy().Div(float64(group.groupCount))
					right := group.histogramMean.Copy().Div(float64(group.groupCount))
					// The histogram being added/subtracted must have
					// an equal or larger schema.
					if s.H.Schema >= group.histogramMean.Schema {
						toAdd := right.Mul(-1).Add(left)
						group.histogramMean.Add(toAdd)
					} else {
						toAdd := left.Sub(right)
						group.histogramMean = toAdd.Add(group.histogramMean)
					}
				}
				// Otherwise the aggregation contained floats
				// previously and will be invalid anyway. No
				// point in copying the histogram in that case.
			} else {
				group.hasFloat = true
				if math.IsInf(group.floatMean, 0) {
					if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) {
						// The `floatMean` and `s.F` values are `Inf` of the same sign.  They
						// can't be subtracted, but the value of `floatMean` is correct
						// already.
						break
					}
					if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
						// At this stage, the mean is an infinite. If the added
						// value is neither an Inf or a Nan, we can keep that mean
						// value.
						// This is required because our calculation below removes
						// the mean value, which would look like Inf += x - Inf and
						// end up as a NaN.
						break
					}
				}
				// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
				group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
			}

		case parser.GROUP:
			// Do nothing. Required to avoid the panic in `default:` below.

		case parser.MAX:
			if group.floatValue < s.F || math.IsNaN(group.floatValue) {
				group.floatValue = s.F
			}

		case parser.MIN:
			if group.floatValue > s.F || math.IsNaN(group.floatValue) {
				group.floatValue = s.F
			}

		case parser.COUNT, parser.COUNT_VALUES:
			group.groupCount++

		case parser.STDVAR, parser.STDDEV:
			if s.H == nil { // Ignore native histograms.
				group.groupCount++
				delta := s.F - group.floatMean
				group.floatMean += delta / float64(group.groupCount)
				group.floatValue += delta * (s.F - group.floatMean)
			}

		case parser.TOPK:
			// We build a heap of up to k elements, with the smallest element at heap[0].
			switch {
			case int64(len(group.heap)) < k:
				heap.Push(&group.heap, &Sample{
					F:      s.F,
					Metric: s.Metric,
				})
			case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
				// This new element is bigger than the previous smallest element - overwrite that.
				group.heap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
				if k > 1 {
					heap.Fix(&group.heap, 0) // Maintain the heap invariant.
				}
			}

		case parser.BOTTOMK:
			// We build a heap of up to k elements, with the biggest element at heap[0].
			switch {
			case int64(len(group.reverseHeap)) < k:
				heap.Push(&group.reverseHeap, &Sample{
					F:      s.F,
					Metric: s.Metric,
				})
			case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)):
				// This new element is smaller than the previous biggest element - overwrite that.
				group.reverseHeap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
				if k > 1 {
					heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant.
				}
			}

		case parser.QUANTILE:
			group.heap = append(group.heap, s)

		default:
			panic(fmt.Errorf("expected aggregation operator but got %q", op))
		}
	}

	// Construct the result Vector from the aggregated groups.
	for _, aggr := range orderedResult {
		switch op {
		case parser.AVG:
			if aggr.hasFloat && aggr.hasHistogram {
				// We cannot aggregate histogram sample with a float64 sample.
				metricName := aggr.labels.Get(labels.MetricName)
				annos.Add(annotations.NewMixedFloatsHistogramsWarning(metricName, e.Expr.PositionRange()))
				continue
			}
			if aggr.hasHistogram {
				aggr.histogramValue = aggr.histogramMean.Compact(0)
			} else {
				aggr.floatValue = aggr.floatMean
			}

		case parser.COUNT, parser.COUNT_VALUES:
			aggr.floatValue = float64(aggr.groupCount)

		case parser.STDVAR:
			aggr.floatValue /= float64(aggr.groupCount)

		case parser.STDDEV:
			aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount))

		case parser.TOPK:
			// The heap keeps the lowest value on top, so reverse it.
			if len(aggr.heap) > 1 {
				sort.Sort(sort.Reverse(aggr.heap))
			}
			for _, v := range aggr.heap {
				enh.Out = append(enh.Out, Sample{
					Metric: v.Metric,
					F:      v.F,
				})
			}
			continue // Bypass default append.

		case parser.BOTTOMK:
			// The heap keeps the highest value on top, so reverse it.
			if len(aggr.reverseHeap) > 1 {
				sort.Sort(sort.Reverse(aggr.reverseHeap))
			}
			for _, v := range aggr.reverseHeap {
				enh.Out = append(enh.Out, Sample{
					Metric: v.Metric,
					F:      v.F,
				})
			}
			continue // Bypass default append.

		case parser.QUANTILE:
			if math.IsNaN(q) || q < 0 || q > 1 {
				annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange()))
			}
			aggr.floatValue = quantile(q, aggr.heap)

		case parser.SUM:
			if aggr.hasFloat && aggr.hasHistogram {
				// We cannot aggregate histogram sample with a float64 sample.
				metricName := aggr.labels.Get(labels.MetricName)
				annos.Add(annotations.NewMixedFloatsHistogramsWarning(metricName, e.Expr.PositionRange()))
				continue
			}
			if aggr.hasHistogram {
				aggr.histogramValue.Compact(0)
			}
		default:
			// For other aggregations, we already have the right value.
		}

		enh.Out = append(enh.Out, Sample{
			Metric: aggr.labels,
			F:      aggr.floatValue,
			H:      aggr.histogramValue,
		})
	}
	return enh.Out, annos
}

// groupingKey builds and returns the grouping key for the given metric and
// grouping labels.
func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) {
	if without {
		return metric.HashWithoutLabels(buf, grouping...)
	}

	if len(grouping) == 0 {
		// No need to generate any hash if there are no grouping labels.
		return 0, buf
	}

	return metric.HashForLabels(buf, grouping...)
}

// btos returns 1 if b is true, 0 otherwise.
func btos(b bool) float64 {
	if b {
		return 1
	}
	return 0
}

// shouldDropMetricName returns whether the metric name should be dropped in the
// result of the op operation.
func shouldDropMetricName(op parser.ItemType) bool {
	switch op {
	case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD, parser.ATAN2:
		return true
	default:
		return false
	}
}

// NewOriginContext returns a new context with data about the origin attached.
func NewOriginContext(ctx context.Context, data map[string]interface{}) context.Context {
	return context.WithValue(ctx, QueryOrigin{}, data)
}

func formatDate(t time.Time) string {
	return t.UTC().Format("2006-01-02T15:04:05.000Z07:00")
}

// unwrapParenExpr does the AST equivalent of removing parentheses around a expression.
func unwrapParenExpr(e *parser.Expr) {
	for {
		if p, ok := (*e).(*parser.ParenExpr); ok {
			*e = p.Expr
		} else {
			break
		}
	}
}

func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
	if p, ok := e.(*parser.StepInvariantExpr); ok {
		return p.Expr
	}
	return e
}

// PreprocessExpr wraps all possible step invariant parts of the given expression with
// StepInvariantExpr. It also resolves the preprocessors.
func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr {
	isStepInvariant := preprocessExprHelper(expr, start, end)
	if isStepInvariant {
		return newStepInvariantExpr(expr)
	}
	return expr
}

// preprocessExprHelper wraps the child nodes of the expression
// with a StepInvariantExpr wherever it's step invariant. The returned boolean is true if the
// passed expression qualifies to be wrapped by StepInvariantExpr.
// It also resolves the preprocessors.
func preprocessExprHelper(expr parser.Expr, start, end time.Time) bool {
	switch n := expr.(type) {
	case *parser.VectorSelector:
		switch n.StartOrEnd {
		case parser.START:
			n.Timestamp = makeInt64Pointer(timestamp.FromTime(start))
		case parser.END:
			n.Timestamp = makeInt64Pointer(timestamp.FromTime(end))
		}
		return n.Timestamp != nil

	case *parser.AggregateExpr:
		return preprocessExprHelper(n.Expr, start, end)

	case *parser.BinaryExpr:
		isInvariant1, isInvariant2 := preprocessExprHelper(n.LHS, start, end), preprocessExprHelper(n.RHS, start, end)
		if isInvariant1 && isInvariant2 {
			return true
		}

		if isInvariant1 {
			n.LHS = newStepInvariantExpr(n.LHS)
		}
		if isInvariant2 {
			n.RHS = newStepInvariantExpr(n.RHS)
		}

		return false

	case *parser.Call:
		_, ok := AtModifierUnsafeFunctions[n.Func.Name]
		isStepInvariant := !ok
		isStepInvariantSlice := make([]bool, len(n.Args))
		for i := range n.Args {
			isStepInvariantSlice[i] = preprocessExprHelper(n.Args[i], start, end)
			isStepInvariant = isStepInvariant && isStepInvariantSlice[i]
		}

		if isStepInvariant {
			// The function and all arguments are step invariant.
			return true
		}

		for i, isi := range isStepInvariantSlice {
			if isi {
				n.Args[i] = newStepInvariantExpr(n.Args[i])
			}
		}
		return false

	case *parser.MatrixSelector:
		return preprocessExprHelper(n.VectorSelector, start, end)

	case *parser.SubqueryExpr:
		// Since we adjust offset for the @ modifier evaluation,
		// it gets tricky to adjust it for every subquery step.
		// Hence we wrap the inside of subquery irrespective of
		// @ on subquery (given it is also step invariant) so that
		// it is evaluated only once w.r.t. the start time of subquery.
		isInvariant := preprocessExprHelper(n.Expr, start, end)
		if isInvariant {
			n.Expr = newStepInvariantExpr(n.Expr)
		}
		switch n.StartOrEnd {
		case parser.START:
			n.Timestamp = makeInt64Pointer(timestamp.FromTime(start))
		case parser.END:
			n.Timestamp = makeInt64Pointer(timestamp.FromTime(end))
		}
		return n.Timestamp != nil

	case *parser.ParenExpr:
		return preprocessExprHelper(n.Expr, start, end)

	case *parser.UnaryExpr:
		return preprocessExprHelper(n.Expr, start, end)

	case *parser.StringLiteral, *parser.NumberLiteral:
		return true
	}

	panic(fmt.Sprintf("found unexpected node %#v", expr))
}

func newStepInvariantExpr(expr parser.Expr) parser.Expr {
	return &parser.StepInvariantExpr{Expr: expr}
}

// setOffsetForAtModifier modifies the offset of vector and matrix selector
// and subquery in the tree to accommodate the timestamp of @ modifier.
// The offset is adjusted w.r.t. the given evaluation time.
func setOffsetForAtModifier(evalTime int64, expr parser.Expr) {
	getOffset := func(ts *int64, originalOffset time.Duration, path []parser.Node) time.Duration {
		if ts == nil {
			return originalOffset
		}

		subqOffset, _, subqTs := subqueryTimes(path)
		if subqTs != nil {
			subqOffset += time.Duration(evalTime-*subqTs) * time.Millisecond
		}

		offsetForTs := time.Duration(evalTime-*ts) * time.Millisecond
		offsetDiff := offsetForTs - subqOffset
		return originalOffset + offsetDiff
	}

	parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
		switch n := node.(type) {
		case *parser.VectorSelector:
			n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path)

		case *parser.MatrixSelector:
			vs := n.VectorSelector.(*parser.VectorSelector)
			vs.Offset = getOffset(vs.Timestamp, vs.OriginalOffset, path)

		case *parser.SubqueryExpr:
			n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path)
		}
		return nil
	})
}

func makeInt64Pointer(val int64) *int64 {
	valp := new(int64)
	*valp = val
	return valp
}