mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
no-brainer copypaste but more performance write support
This commit is contained in:
parent
0062b91460
commit
fff56c0df7
|
@ -770,6 +770,29 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
|
|||
return b.Labels()
|
||||
}
|
||||
|
||||
func labelRefProtosToLabels(st map[uint64]string, lbls []prompb.LabelRef) labels.Labels {
|
||||
result := make(labels.Labels, 0, len(lbls))
|
||||
for _, l := range lbls {
|
||||
result = append(result, labels.Label{
|
||||
Name: st[l.NameRef],
|
||||
Value: st[l.ValueRef],
|
||||
})
|
||||
}
|
||||
sort.Sort(result)
|
||||
return result
|
||||
}
|
||||
|
||||
func exemplarRefProtoToExemplar(st map[uint64]string, ep prompb.ExemplarRef) exemplar.Exemplar {
|
||||
timestamp := ep.Timestamp
|
||||
|
||||
return exemplar.Exemplar{
|
||||
Labels: labelRefProtosToLabels(st, ep.Labels),
|
||||
Value: ep.Value,
|
||||
Ts: timestamp,
|
||||
HasTs: timestamp != 0,
|
||||
}
|
||||
}
|
||||
|
||||
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
|
||||
// will be used to avoid allocations if it is big enough to store the labels.
|
||||
func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
|
||||
|
@ -908,6 +931,7 @@ func ReducedWriteRequestToWriteRequest(redReq *prompb.WriteRequestWithRefs) (*pr
|
|||
exemplars[j].Value = e.Value
|
||||
exemplars[j].Timestamp = e.Timestamp
|
||||
exemplars[j].Labels = make([]prompb.Label, len(e.Labels))
|
||||
|
||||
for k, l := range e.Labels {
|
||||
exemplars[j].Labels[k].Name = redReq.StringSymbolTable[l.NameRef]
|
||||
exemplars[j].Labels[k].Value = redReq.StringSymbolTable[l.ValueRef]
|
||||
|
|
|
@ -64,12 +64,9 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
|
|||
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
var req *prompb.WriteRequest
|
||||
var reqWithRefs *prompb.WriteRequestWithRefs
|
||||
if h.internFormat {
|
||||
var redReq *prompb.WriteRequestWithRefs
|
||||
redReq, err = DecodeReducedWriteRequest(r.Body)
|
||||
if err == nil {
|
||||
req, err = ReducedWriteRequestToWriteRequest(redReq)
|
||||
}
|
||||
reqWithRefs, err = DecodeReducedWriteRequest(r.Body)
|
||||
} else {
|
||||
req, err = DecodeWriteRequest(r.Body)
|
||||
}
|
||||
|
@ -80,7 +77,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
err = h.write(r.Context(), req)
|
||||
if h.internFormat {
|
||||
err = h.writeReduced(r.Context(), reqWithRefs)
|
||||
} else {
|
||||
err = h.write(r.Context(), req)
|
||||
}
|
||||
switch err {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp:
|
||||
|
@ -251,3 +252,70 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *writeHandler) writeReduced(ctx context.Context, req *prompb.WriteRequestWithRefs) (err error) {
|
||||
outOfOrderExemplarErrs := 0
|
||||
|
||||
app := h.appendable.Appender(ctx)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
}()
|
||||
|
||||
var exemplarErr error
|
||||
for _, ts := range req.Timeseries {
|
||||
labels := labelRefProtosToLabels(req.StringSymbolTable, ts.Labels)
|
||||
|
||||
for _, s := range ts.Samples {
|
||||
_, err = app.Append(0, labels, s.Timestamp, s.Value)
|
||||
if err != nil {
|
||||
unwrapedErr := errors.Unwrap(err)
|
||||
if errors.Is(unwrapedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrapedErr, storage.ErrOutOfBounds) || errors.Is(unwrapedErr, storage.ErrDuplicateSampleForTimestamp) {
|
||||
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error(), "series", labels.String(), "timestamp", s.Timestamp)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for _, ep := range ts.Exemplars {
|
||||
e := exemplarRefProtoToExemplar(req.StringSymbolTable, ep)
|
||||
|
||||
_, exemplarErr = app.AppendExemplar(0, labels, e)
|
||||
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
|
||||
if exemplarErr != nil {
|
||||
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors.
|
||||
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
||||
}
|
||||
}
|
||||
|
||||
for _, hp := range ts.Histograms {
|
||||
if hp.IsFloatHistogram() {
|
||||
fhs := FloatHistogramProtoToFloatHistogram(hp)
|
||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
|
||||
} else {
|
||||
hs := HistogramProtoToHistogram(hp)
|
||||
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
unwrappedErr := errors.Unwrap(err)
|
||||
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is
|
||||
// a note indicating its inclusion in the future.
|
||||
if errors.Is(unwrappedErr, storage.ErrOutOfOrderSample) || errors.Is(unwrappedErr, storage.ErrOutOfBounds) || errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp) {
|
||||
level.Error(h.logger).Log("msg", "Out of order histogram from remote write", "err", err.Error(), "series", labels.String(), "timestamp", hp.Timestamp)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if outOfOrderExemplarErrs > 0 {
|
||||
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue