diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 09c44f0338..104444332a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -770,6 +770,29 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels { return b.Labels() } +func labelRefProtosToLabels(st map[uint64]string, lbls []prompb.LabelRef) labels.Labels { + result := make(labels.Labels, 0, len(lbls)) + for _, l := range lbls { + result = append(result, labels.Label{ + Name: st[l.NameRef], + Value: st[l.ValueRef], + }) + } + sort.Sort(result) + return result +} + +func exemplarRefProtoToExemplar(st map[uint64]string, ep prompb.ExemplarRef) exemplar.Exemplar { + timestamp := ep.Timestamp + + return exemplar.Exemplar{ + Labels: labelRefProtosToLabels(st, ep.Labels), + Value: ep.Value, + Ts: timestamp, + HasTs: timestamp != 0, + } +} + // labelsToLabelsProto transforms labels into prompb labels. The buffer slice // will be used to avoid allocations if it is big enough to store the labels. func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label { @@ -908,6 +931,7 @@ func ReducedWriteRequestToWriteRequest(redReq *prompb.WriteRequestWithRefs) (*pr exemplars[j].Value = e.Value exemplars[j].Timestamp = e.Timestamp exemplars[j].Labels = make([]prompb.Label, len(e.Labels)) + for k, l := range e.Labels { exemplars[j].Labels[k].Name = redReq.StringSymbolTable[l.NameRef] exemplars[j].Labels[k].Value = redReq.StringSymbolTable[l.ValueRef] diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index d0d3284fd2..ce756254b7 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -64,12 +64,9 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var err error var req *prompb.WriteRequest + var reqWithRefs *prompb.WriteRequestWithRefs if h.internFormat { - var redReq *prompb.WriteRequestWithRefs - redReq, err = DecodeReducedWriteRequest(r.Body) - if err == nil { - req, err = ReducedWriteRequestToWriteRequest(redReq) - } + reqWithRefs, err = DecodeReducedWriteRequest(r.Body) } else { req, err = DecodeWriteRequest(r.Body) } @@ -80,7 +77,12 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - err = h.write(r.Context(), req) + if h.internFormat { + err = h.writeReduced(r.Context(), reqWithRefs) + } else { + err = h.write(r.Context(), req) + } + switch { case err == nil: case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): @@ -253,3 +255,70 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteRequestWithRefs) (err error) { + outOfOrderExemplarErrs := 0 + + app := h.appendable.Appender(ctx) + defer func() { + if err != nil { + _ = app.Rollback() + return + } + err = app.Commit() + }() + + var exemplarErr error + for _, ts := range req.Timeseries { + labels := labelRefProtosToLabels(req.StringSymbolTable, ts.Labels) + + for _, s := range ts.Samples { + _, err = app.Append(0, labels, s.Timestamp, s.Value) + if err != nil { + unwrapedErr := errors.Unwrap(err) + if errors.Is(unwrapedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrapedErr, storage.ErrOutOfBounds) || errors.Is(unwrapedErr, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) + } + return err + } + + } + + for _, ep := range ts.Exemplars { + e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep) + + _, exemplarErr = app.AppendExemplar(0, labels, e) + exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) + if exemplarErr != nil { + // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. + level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) + } + } + + for _, hp := range ts.Histograms { + if hp.IsFloatHistogram() { + fhs := FloatHistogramProtoToFloatHistogram(hp) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + } else { + hs := HistogramProtoToHistogram(hp) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + } + + if err != nil { + unwrappedErr := errors.Unwrap(err) + // Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is + // a note indicating its inclusion in the future. + if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) + } + return err + } + } + } + + if outOfOrderExemplarErrs > 0 { + _ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs) + } + + return nil +}