refactor out common code between write methods

Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Nicolás Pazos 2023-10-02 17:36:42 -03:00
parent 407e596ce3
commit 2e57d7e645

View file

@ -19,6 +19,8 @@ import (
"fmt"
"net/http"
"github.com/prometheus/prometheus/model/labels"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -129,7 +131,6 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
err = app.Commit()
}()
var exemplarErr error
for _, ts := range req.Timeseries {
labels := labelProtosToLabels(ts.Labels)
if !labels.IsValid() {
@ -137,9 +138,47 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
samplesWithInvalidLabels++
continue
}
err := h.appendSamples(app, ts.Samples, labels)
if err != nil {
return err
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, labels)
if err != nil {
return err
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
if samplesWithInvalidLabels > 0 {
h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels))
}
return nil
}
func (h *writeHandler) appendExemplar(app storage.Appender, e exemplar.Exemplar, labels labels.Labels, outOfOrderExemplarErrs *int) {
_, err := app.AppendExemplar(0, labels, e)
err = h.checkAppendExemplarError(err, e, outOfOrderExemplarErrs)
if err != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", err)
}
}
func (h *writeHandler) appendSamples(app storage.Appender, ss []prompb.Sample, labels labels.Labels) error {
var ref storage.SeriesRef
for _, s := range ts.Samples {
ref, err = app.Append(ref, labels, s.Timestamp, s.Value)
var err error
for _, s := range ss {
ref, err = app.Append(ref, labels, s.Timestamp, s.
Value)
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
@ -150,21 +189,13 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
return err
}
}
return nil
}
for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(ep)
_, exemplarErr = app.AppendExemplar(0, labels, e)
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
if exemplarErr != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors.
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
}
}
for _, hp := range ts.Histograms {
func (h *writeHandler) appendHistograms(app storage.Appender, hh []prompb.Histogram, labels labels.Labels) error {
var err error
for _, hp := range hh {
if hp.IsFloatHistogram() {
fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
@ -185,15 +216,6 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
return err
}
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
if samplesWithInvalidLabels > 0 {
h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels))
}
return nil
}
@ -268,53 +290,30 @@ func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteReques
err = app.Commit()
}()
var exemplarErr error
for _, ts := range req.Timeseries {
labels := labelRefProtosToLabels(req.StringSymbolTable, ts.Labels)
// TODO(npazosmendez): ?
// if !labels.IsValid() {
// level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
// samplesWithInvalidLabels++
// continue
// }
for _, s := range ts.Samples {
_, err = app.Append(0, labels, s.Timestamp, s.Value)
err := h.appendSamples(app, ts.Samples, labels)
if err != nil {
unwrapedErr := errors.Unwrap(err)
if errors.Is(unwrapedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrapedErr, storage.ErrOutOfBounds) || errors.Is(unwrapedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
}
return err
}
}
for _, ep := range ts.Exemplars {
e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
_, exemplarErr = app.AppendExemplar(0, labels, e)
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
if exemplarErr != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors.
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
}
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
h.appendExemplar(app, e, labels, &outOfOrderExemplarErrs)
}
err = h.appendHistograms(app, ts.Histograms, labels)
if err != nil {
unwrappedErr := errors.Unwrap(err)
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
// a note indicating its inclusion in the future.
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
}
return err
}
}
}
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)