From 2e57d7e64537cdce3c0dd95c5a162a2e7dddae71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Mon, 2 Oct 2023 17:36:42 -0300 Subject: [PATCH] refactor out common code between write methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nicolás Pazos --- storage/remote/write_handler.go | 155 ++++++++++++++++---------------- 1 file changed, 77 insertions(+), 78 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index ce756254b..2aabcfa02 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -19,6 +19,8 @@ import ( "fmt" "net/http" + "github.com/prometheus/prometheus/model/labels" + "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -129,7 +131,6 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err err = app.Commit() }() - var exemplarErr error for _, ts := range req.Timeseries { labels := labelProtosToLabels(ts.Labels) if !labels.IsValid() { @@ -137,53 +138,19 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err samplesWithInvalidLabels++ continue } - var ref storage.SeriesRef - for _, s := range ts.Samples { - ref, err = app.Append(ref, labels, s.Timestamp, s.Value) - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) - } - return err - } - + err := h.appendSamples(app, ts.Samples, labels) + if err != nil { + return err } for _, ep := range ts.Exemplars { e := exemplarProtoToExemplar(ep) - - _, exemplarErr = app.AppendExemplar(0, labels, e) - exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) - if exemplarErr != nil { - // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. - level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) - } + h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs) } - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fhs := FloatHistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) - } else { - hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) - } - if err != nil { - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is - // a note indicating its inclusion in the future. - if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) - } - return err - } + err = h.appendHistograms(app, ts.Histograms, labels) + if err != nil { + return err } } @@ -197,6 +164,61 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err return nil } +func (h *writeHandler) appendExemplar(app storage.Appender, e exemplar.Exemplar, labels labels.Labels, outOfOrderExemplarErrs *int) { + _, err := app.AppendExemplar(0, labels, e) + err = h.checkAppendExemplarError(err, e, outOfOrderExemplarErrs) + if err != nil { + // Since exemplar storage is still experimental, we don't fail the request on ingestion errors + level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", err) + } +} + +func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error { + var ref storage.SeriesRef + var err error + for _, s := range ss { + ref, err = app.Append(ref, labels, s.Timestamp, s. + Value) + if err != nil { + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + if errors.Is(err, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) + } + return err + } + } + return nil +} + +func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error { + var err error + for _, hp := range hh { + if hp.IsFloatHistogram() { + fhs := FloatHistogramProtoToFloatHistogram(hp) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + } else { + hs := HistogramProtoToHistogram(hp) + _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + } + if err != nil { + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + // Although AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is + // a note indicating its inclusion in the future. + if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { + level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) + } + return err + } + } + return nil +} + // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { @@ -268,51 +290,28 @@ func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteReques err = app.Commit() }() - var exemplarErr error for _, ts := range req.Timeseries { labels := labelRefProtosToLabels(req.StringSymbolTable, ts.Labels) + // TODO(npazosmendez): ? + // if !labels.IsValid() { + // level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String()) + // samplesWithInvalidLabels++ + // continue + // } - for _, s := range ts.Samples { - _, err = app.Append(0, labels, s.Timestamp, s.Value) - if err != nil { - unwrapedErr := errors.Unwrap(err) - if errors.Is(unwrapedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrapedErr, storage.ErrOutOfBounds) || errors.Is(unwrapedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp) - } - return err - } - + err := h.appendSamples(app, ts.Samples, labels) + if err != nil { + return err } for _, ep := range ts.Exemplars { e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep) - - _, exemplarErr = app.AppendExemplar(0, labels, e) - exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) - if exemplarErr != nil { - // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. - level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) - } + h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs) } - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fhs := FloatHistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) - } else { - hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) - } - - if err != nil { - unwrappedErr := errors.Unwrap(err) - // Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is - // a note indicating its inclusion in the future. - if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) { - level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp) - } - return err - } + err = h.appendHistograms(app, ts.Histograms, labels) + if err != nil { + return err } }