This commit is contained in:
Junang Li 2024-07-21 23:26:02 -04:00
parent 47cfc39545
commit e6fe0e3302
10 changed files with 71 additions and 71 deletions

View file

@ -109,7 +109,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
return false return false
} }
raw, err := metricsData.Marshal() raw, err := metricsData.MarshalVT()
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err) fmt.Fprintln(os.Stderr, " FAILED:", err)
return false return false

View file

@ -46,10 +46,10 @@ func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []*Label) labels.L
// FromLabels transforms labels into prompb labels. The buffer slice // FromLabels transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels. // will be used to avoid allocations if it is big enough to store the labels.
func FromLabels(lbls labels.Labels, buf []Label) []Label { func FromLabels(lbls labels.Labels, buf []*Label) []*Label {
result := buf[:0] result := buf[:0]
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
result = append(result, Label{ result = append(result, &Label{
Name: l.Name, Name: l.Name,
Value: l.Value, Value: l.Value,
}) })
@ -146,8 +146,8 @@ func deltasToCounts(deltas []int64) []float64 {
} }
// FromIntHistogram returns remote Histogram from the integer Histogram. // FromIntHistogram returns remote Histogram from the integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram { func FromIntHistogram(timestamp int64, h *histogram.Histogram) *Histogram {
return Histogram{ return &Histogram{
Count: &Histogram_CountInt{CountInt: h.Count}, Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum, Sum: h.Sum,
Schema: h.Schema, Schema: h.Schema,
@ -163,8 +163,8 @@ func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
} }
// FromFloatHistogram returns remote Histogram from the float Histogram. // FromFloatHistogram returns remote Histogram from the float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram { func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) *Histogram {
return Histogram{ return &Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count}, Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum, Sum: fh.Sum,
Schema: fh.Schema, Schema: fh.Schema,

View file

@ -160,8 +160,8 @@ func deltasToCounts(deltas []int64) []float64 {
} }
// FromIntHistogram returns remote Histogram from the integer Histogram. // FromIntHistogram returns remote Histogram from the integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram { func FromIntHistogram(timestamp int64, h *histogram.Histogram) *Histogram {
return Histogram{ return &Histogram{
Count: &Histogram_CountInt{CountInt: h.Count}, Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum, Sum: h.Sum,
Schema: h.Schema, Schema: h.Schema,
@ -178,8 +178,8 @@ func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
} }
// FromFloatHistogram returns remote Histogram from the float Histogram. // FromFloatHistogram returns remote Histogram from the float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram { func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) *Histogram {
return Histogram{ return &Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count}, Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum, Sum: fh.Sum,
Schema: fh.Schema, Schema: fh.Schema,

View file

@ -130,8 +130,8 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
iter = series.Iterator(iter) iter = series.Iterator(iter)
var ( var (
samples []prompb.Sample samples []*prompb.Sample
histograms []prompb.Histogram histograms []*prompb.Histogram
) )
for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() { for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() {
@ -146,7 +146,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
switch valType { switch valType {
case chunkenc.ValFloat: case chunkenc.ValFloat:
ts, val := iter.At() ts, val := iter.At()
samples = append(samples, prompb.Sample{ samples = append(samples, &prompb.Sample{
Timestamp: ts, Timestamp: ts,
Value: val, Value: val,
}) })
@ -221,13 +221,13 @@ func StreamChunkedReadResponses(
stream io.Writer, stream io.Writer,
queryIndex int64, queryIndex int64,
ss storage.ChunkSeriesSet, ss storage.ChunkSeriesSet,
sortedExternalLabels []prompb.Label, sortedExternalLabels []*prompb.Label,
maxBytesInFrame int, maxBytesInFrame int,
marshalPool *sync.Pool, marshalPool *sync.Pool,
) (annotations.Annotations, error) { ) (annotations.Annotations, error) {
var ( var (
chks []prompb.Chunk chks []*prompb.Chunk
lbls []prompb.Label lbls []*prompb.Label
iter chunks.Iterator iter chunks.Iterator
) )
@ -238,7 +238,7 @@ func StreamChunkedReadResponses(
maxDataLength := maxBytesInFrame maxDataLength := maxBytesInFrame
for _, lbl := range lbls { for _, lbl := range lbls {
maxDataLength -= lbl.Size() maxDataLength -= lbl.SizeVT()
} }
frameBytesLeft := maxDataLength frameBytesLeft := maxDataLength
@ -253,13 +253,13 @@ func StreamChunkedReadResponses(
} }
// Cut the chunk. // Cut the chunk.
chks = append(chks, prompb.Chunk{ chks = append(chks, &prompb.Chunk{
MinTimeMs: chk.MinTime, MinTimeMs: chk.MinTime,
MaxTimeMs: chk.MaxTime, MaxTimeMs: chk.MaxTime,
Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()), Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()),
Data: chk.Chunk.Bytes(), Data: chk.Chunk.Bytes(),
}) })
frameBytesLeft -= chks[len(chks)-1].Size() frameBytesLeft -= chks[len(chks)-1].SizeVT()
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next() isNext = iter.Next()
@ -297,8 +297,8 @@ func StreamChunkedReadResponses(
// MergeLabels merges two sets of sorted proto labels, preferring those in // MergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap. // primary to those in secondary when there is an overlap.
func MergeLabels(primary, secondary []prompb.Label) []prompb.Label { func MergeLabels(primary, secondary []*prompb.Label) []*prompb.Label {
result := make([]prompb.Label, 0, len(primary)+len(secondary)) result := make([]*prompb.Label, 0, len(primary)+len(secondary))
i, j := 0, 0 i, j := 0, 0
for i < len(primary) && j < len(secondary) { for i < len(primary) && j < len(secondary) {
switch { switch {
@ -366,8 +366,8 @@ func (c *concreteSeriesSet) Warnings() annotations.Annotations { return nil }
// concreteSeries implements storage.Series. // concreteSeries implements storage.Series.
type concreteSeries struct { type concreteSeries struct {
labels labels.Labels labels labels.Labels
floats []prompb.Sample floats []*prompb.Sample
histograms []prompb.Histogram histograms []*prompb.Histogram
} }
func (c *concreteSeries) Labels() labels.Labels { func (c *concreteSeries) Labels() labels.Labels {
@ -439,7 +439,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp { if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp {
c.curValType = chunkenc.ValFloat c.curValType = chunkenc.ValFloat
} else { } else {
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) c.curValType = getHistogramValType(c.series.histograms[c.histogramsCur])
} }
// When the timestamps do not overlap the cursor for the non-selected sample type has advanced too // When the timestamps do not overlap the cursor for the non-selected sample type has advanced too
// far; we decrement it back down here. // far; we decrement it back down here.
@ -453,7 +453,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
case c.floatsCur < len(c.series.floats): case c.floatsCur < len(c.series.floats):
c.curValType = chunkenc.ValFloat c.curValType = chunkenc.ValFloat
case c.histogramsCur < len(c.series.histograms): case c.histogramsCur < len(c.series.histograms):
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) c.curValType = getHistogramValType(c.series.histograms[c.histogramsCur])
} }
return c.curValType return c.curValType
} }
@ -542,7 +542,7 @@ func (c *concreteSeriesIterator) Err() error {
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read, // validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
// also making sure that there are no labels with duplicate names. // also making sure that there are no labels with duplicate names.
func validateLabelsAndMetricName(ls []prompb.Label) error { func validateLabelsAndMetricName(ls []*prompb.Label) error {
for i, l := range ls { for i, l := range ls {
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
return fmt.Errorf("invalid metric name: %v", l.Value) return fmt.Errorf("invalid metric name: %v", l.Value)

View file

@ -96,7 +96,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa
return return
} }
createdLabels := make([]prompb.Label, len(lbls)) createdLabels := make([]*prompb.Label, len(lbls))
copy(createdLabels, lbls) copy(createdLabels, lbls)
for i, l := range createdLabels { for i, l := range createdLabels {
if l.Name == model.MetricNameLabel { if l.Name == model.MetricNameLabel {

View file

@ -22,18 +22,18 @@ import (
) )
// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. // TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries { func (c *PrometheusConverter) TimeSeries() []*prompb.TimeSeries {
conflicts := 0 conflicts := 0
for _, ts := range c.conflicts { for _, ts := range c.conflicts {
conflicts += len(ts) conflicts += len(ts)
} }
allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts) allTS := make([]*prompb.TimeSeries, 0, len(c.unique)+conflicts)
for _, ts := range c.unique { for _, ts := range c.unique {
allTS = append(allTS, *ts) allTS = append(allTS, ts)
} }
for _, cTS := range c.conflicts { for _, cTS := range c.conflicts {
for _, ts := range cTS { for _, ts := range cTS {
allTS = append(allTS, *ts) allTS = append(allTS, ts)
} }
} }

View file

@ -547,9 +547,9 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
} }
// 1.X will still get metadata in batches. // 1.X will still get metadata in batches.
mm := make([]prompb.MetricMetadata, 0, len(metadata)) mm := make([]*prompb.MetricMetadata, 0, len(metadata))
for _, entry := range metadata { for _, entry := range metadata {
mm = append(mm, prompb.MetricMetadata{ mm = append(mm, &prompb.MetricMetadata{
MetricFamilyName: entry.Metric, MetricFamilyName: entry.Metric,
Help: entry.Help, Help: entry.Help,
Type: prompb.FromMetadataType(entry.Type), Type: prompb.FromMetadataType(entry.Type),
@ -572,7 +572,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
} }
} }
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples (v1 flow). // Build the WriteRequest with no samples (v1 flow).
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.enc) req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, t.enc)
if err != nil { if err != nil {
@ -631,8 +631,8 @@ func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) boo
return sampleTs.Before(limitTs) return sampleTs.Before(limitTs)
} }
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool { func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts *prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool { return func(ts *prompb.TimeSeries) bool {
if sampleAgeLimit == 0 { if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age. // If sampleAgeLimit is unset, then we never skip samples due to their age.
return false return false
@ -661,8 +661,8 @@ func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sam
} }
} }
func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts writev2.TimeSeries) bool { func isV2TimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts *writev2.TimeSeries) bool {
return func(ts writev2.TimeSeries) bool { return func(ts *writev2.TimeSeries) bool {
if sampleAgeLimit == 0 { if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age. // If sampleAgeLimit is unset, then we never skip samples due to their age.
return false return false
@ -1535,16 +1535,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
// TODO: Dry all of this, we should make an interface/generic for the timeseries type. // TODO: Dry all of this, we should make an interface/generic for the timeseries type.
batchQueue := queue.Chan() batchQueue := queue.Chan()
pendingData := make([]prompb.TimeSeries, max) pendingData := make([]*prompb.TimeSeries, max)
for i := range pendingData { for i := range pendingData {
pendingData[i].Samples = []prompb.Sample{{}} pendingData[i].Samples = []*prompb.Sample{{}}
if s.qm.sendExemplars { if s.qm.sendExemplars {
pendingData[i].Exemplars = []prompb.Exemplar{{}} pendingData[i].Exemplars = []*prompb.Exemplar{{}}
} }
} }
pendingDataV2 := make([]writev2.TimeSeries, max) pendingDataV2 := make([]*writev2.TimeSeries, max)
for i := range pendingDataV2 { for i := range pendingDataV2 {
pendingDataV2[i].Samples = []writev2.Sample{{}} pendingDataV2[i].Samples = []*writev2.Sample{{}}
} }
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1619,7 +1619,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
} }
func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { func populateTimeSeries(batch []timeSeries, pendingData []*prompb.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms int var nPendingSamples, nPendingExemplars, nPendingHistograms int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1637,13 +1637,13 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, &prompb.Sample{
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, &prompb.Exemplar{
Labels: prompb.FromLabels(d.exemplarLabels, nil), Labels: prompb.FromLabels(d.exemplarLabels, nil),
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
@ -1660,7 +1660,7 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
return nPendingSamples, nPendingExemplars, nPendingHistograms return nPendingSamples, nPendingExemplars, nPendingHistograms
} }
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error { func (s *shards) sendSamples(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) error {
begin := time.Now() begin := time.Now()
rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc) rs, err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, rs, time.Since(begin))
@ -1669,7 +1669,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// TODO(bwplotka): DRY this (have one logic for both v1 and v2). // TODO(bwplotka): DRY this (have one logic for both v1 and v2).
// See https://github.com/prometheus/prometheus/issues/14409 // See https://github.com/prometheus/prometheus/issues/14409
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { func (s *shards) sendV2Samples(ctx context.Context, samples []*writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
begin := time.Now() begin := time.Now()
rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc) rs, err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, enc)
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin)) s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, rs, time.Since(begin))
@ -1713,7 +1713,7 @@ func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exempl
} }
// sendSamples to the remote storage with backoff for recoverable errors. // sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) { func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []*prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, enc Compression) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc) req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, enc)
s.qm.buildRequestLimitTimestamp.Store(lowest) s.qm.buildRequestLimitTimestamp.Store(lowest)
@ -1827,7 +1827,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
} }
// sendV2Samples to the remote storage with backoff for recoverable errors. // sendV2Samples to the remote storage with backoff for recoverable errors.
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) { func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []*writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) (WriteResponseStats, error) {
// Build the WriteRequest with no metadata. // Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc) req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
s.qm.buildRequestLimitTimestamp.Store(lowest) s.qm.buildRequestLimitTimestamp.Store(lowest)
@ -1939,7 +1939,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
return accumulatedStats, err return accumulatedStats, err
} }
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) { func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []*writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch { for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
@ -1964,13 +1964,13 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs) pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, &writev2.Sample{
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, writev2.Exemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, &writev2.Exemplar{
LabelsRefs: symbolTable.SymbolizeLabels(d.exemplarLabels, nil), // TODO: optimize, reuse slice LabelsRefs: symbolTable.SymbolizeLabels(d.exemplarLabels, nil), // TODO: optimize, reuse slice
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
@ -2079,7 +2079,7 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda
} }
} }
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) { func buildTimeSeries(timeSeries []*prompb.TimeSeries, filter func(*prompb.TimeSeries) bool) (int64, int64, []*prompb.TimeSeries, int, int, int) {
var highest int64 var highest int64
var lowest int64 var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int var droppedSamples, droppedExemplars, droppedHistograms int
@ -2147,7 +2147,7 @@ func compressPayload(tmpbuf *[]byte, inp []byte, enc Compression) (compressed []
} }
} }
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) { func buildWriteRequest(logger log.Logger, timeSeries []*prompb.TimeSeries, metadata []*prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(*prompb.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter) droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
@ -2185,7 +2185,7 @@ func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metada
return compressed, highest, lowest, nil return compressed, highest, lowest, nil
} }
func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) { func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(*writev2.TimeSeries) bool, enc Compression) (compressed []byte, highest, lowest int64, _ error) {
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter) highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 { if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
@ -2222,7 +2222,7 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels
return compressed, highest, lowest, nil return compressed, highest, lowest, nil
} }
func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.TimeSeries) bool) (int64, int64, []writev2.TimeSeries, int, int, int) { func buildV2TimeSeries(timeSeries []*writev2.TimeSeries, filter func(*writev2.TimeSeries) bool) (int64, int64, []*writev2.TimeSeries, int, int, int) {
var highest int64 var highest int64
var lowest int64 var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int var droppedSamples, droppedExemplars, droppedHistograms int

View file

@ -88,14 +88,14 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
externalLabels := h.config().GlobalConfig.ExternalLabels.Map() externalLabels := h.config().GlobalConfig.ExternalLabels.Map()
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) sortedExternalLabels := make([]*prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels { for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ sortedExternalLabels = append(sortedExternalLabels, &prompb.Label{
Name: name, Name: name,
Value: value, Value: value,
}) })
} }
slices.SortFunc(sortedExternalLabels, func(a, b prompb.Label) int { slices.SortFunc(sortedExternalLabels, func(a, b *prompb.Label) int {
return strings.Compare(a.Name, b.Name) return strings.Compare(a.Name, b.Name)
}) })
@ -119,7 +119,7 @@ func (h *readHandler) remoteReadSamples(
w http.ResponseWriter, w http.ResponseWriter,
req *prompb.ReadRequest, req *prompb.ReadRequest,
externalLabels map[string]string, externalLabels map[string]string,
sortedExternalLabels []prompb.Label, sortedExternalLabels []*prompb.Label,
) { ) {
w.Header().Set("Content-Type", "application/x-protobuf") w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy") w.Header().Set("Content-Encoding", "snappy")
@ -186,7 +186,7 @@ func (h *readHandler) remoteReadSamples(
} }
} }
func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []*prompb.Label) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
f, ok := w.(http.Flusher) f, ok := w.(http.Flusher)

View file

@ -278,7 +278,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
return nil return nil
} }
func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error { func (h *writeHandler) appendV1Samples(app storage.Appender, ss []*prompb.Sample, labels labels.Labels) error {
var ref storage.SeriesRef var ref storage.SeriesRef
var err error var err error
for _, s := range ss { for _, s := range ss {
@ -295,7 +295,7 @@ func (h *writeHandler) appendV1Samples(app storage.Appender, ss []prompb.Sample,
return nil return nil
} }
func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error { func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []*prompb.Histogram, labels labels.Labels) error {
var err error var err error
for _, hp := range hh { for _, hp := range hh {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {

View file

@ -69,7 +69,7 @@ func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels m
for _, metricName := range sortedMetricNames { for _, metricName := range sortedMetricNames {
// Set metadata writerequest // Set metadata writerequest
mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] mtype := MetricMetadataTypeValue[mf[metricName].Type.String()]
metadata := prompb.MetricMetadata{ metadata := &prompb.MetricMetadata{
MetricFamilyName: mf[metricName].GetName(), MetricFamilyName: mf[metricName].GetName(),
Type: prompb.MetricMetadata_MetricType(mtype), Type: prompb.MetricMetadata_MetricType(mtype),
Help: mf[metricName].GetHelp(), Help: mf[metricName].GetHelp(),
@ -87,9 +87,9 @@ func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels m
} }
func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) { func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) {
var ts prompb.TimeSeries var ts *prompb.TimeSeries
ts.Labels = makeLabels(labels) ts.Labels = makeLabels(labels)
ts.Samples = []prompb.Sample{ ts.Samples = []*prompb.Sample{
{ {
Timestamp: timestamp, Timestamp: timestamp,
Value: value, Value: value,
@ -161,7 +161,7 @@ func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Me
return err return err
} }
func makeLabels(labelsMap map[string]string) []prompb.Label { func makeLabels(labelsMap map[string]string) []*prompb.Label {
// build labels name list // build labels name list
sortedLabelNames := make([]string, 0, len(labelsMap)) sortedLabelNames := make([]string, 0, len(labelsMap))
for label := range labelsMap { for label := range labelsMap {
@ -170,9 +170,9 @@ func makeLabels(labelsMap map[string]string) []prompb.Label {
// sort labels name in lexicographical order // sort labels name in lexicographical order
sort.Strings(sortedLabelNames) sort.Strings(sortedLabelNames)
var labels []prompb.Label var labels []*prompb.Label
for _, label := range sortedLabelNames { for _, label := range sortedLabelNames {
labels = append(labels, prompb.Label{ labels = append(labels, &prompb.Label{
Name: label, Name: label,
Value: labelsMap[label], Value: labelsMap[label],
}) })