mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
tsdb: Add support for out-of-order exemplars
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
f4b1fcb73e
commit
8cba4f8113
|
@ -16,6 +16,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
@ -273,6 +274,42 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// outOfOrderInsertionPoint determines the insertion point for an out of order exemplar.
|
||||||
|
// If it should instead turn out to be a duplicate, an error is returned.
|
||||||
|
func (ce *CircularExemplarStorage) outOfOrderInsertionPoint(key []byte, e exemplar.Exemplar) (int, error) {
|
||||||
|
idx := ce.index[string(key)]
|
||||||
|
|
||||||
|
oldest := ce.exemplars[idx.oldest].exemplar
|
||||||
|
if e.Ts < oldest.Ts || (e.Ts == oldest.Ts &&
|
||||||
|
(e.Value < oldest.Value || (e.Value == oldest.Value && e.Labels.Hash() < oldest.Labels.Hash()))) {
|
||||||
|
// This is an out of order exemplar preceding the up till now oldest exemplar of the series
|
||||||
|
return idx.oldest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i := idx.oldest
|
||||||
|
// Search for insertion point
|
||||||
|
for {
|
||||||
|
current := ce.exemplars[i]
|
||||||
|
next := ce.exemplars[current.next].exemplar
|
||||||
|
if e.Equals(current.exemplar) || e.Equals(next) {
|
||||||
|
return -1, storage.ErrDuplicateExemplar
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.Ts < next.Ts || (e.Ts == next.Ts &&
|
||||||
|
(e.Value < next.Value || (e.Value == next.Value && e.Labels.Hash() < next.Labels.Hash()))) {
|
||||||
|
// Found our insertion point
|
||||||
|
return i, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i = current.next
|
||||||
|
if i == noExemplar {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, fmt.Errorf("could not find out of order exemplar insertion point")
|
||||||
|
}
|
||||||
|
|
||||||
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
|
// Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
|
||||||
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
|
// Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
|
||||||
func (ce *CircularExemplarStorage) Resize(l int64) int {
|
func (ce *CircularExemplarStorage) Resize(l int64) int {
|
||||||
|
@ -361,6 +398,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
||||||
ce.lock.Lock()
|
ce.lock.Lock()
|
||||||
defer ce.lock.Unlock()
|
defer ce.lock.Unlock()
|
||||||
|
|
||||||
|
insertionPoint := -1
|
||||||
idx, ok := ce.index[string(seriesLabels)]
|
idx, ok := ce.index[string(seriesLabels)]
|
||||||
err := ce.validateExemplar(idx, e, true)
|
err := ce.validateExemplar(idx, e, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -368,8 +406,15 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
||||||
// Duplicate exemplar, noop.
|
// Duplicate exemplar, noop.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
|
||||||
|
insertionPoint, err = ce.outOfOrderInsertionPoint(seriesLabels, e)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
|
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
|
||||||
|
@ -396,7 +441,25 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
|
||||||
ce.exemplars[ce.nextIndex].next = noExemplar
|
ce.exemplars[ce.nextIndex].next = noExemplar
|
||||||
ce.exemplars[ce.nextIndex].exemplar = e
|
ce.exemplars[ce.nextIndex].exemplar = e
|
||||||
ce.exemplars[ce.nextIndex].ref = idx
|
ce.exemplars[ce.nextIndex].ref = idx
|
||||||
|
// Only set newest if not an out of order exemplar
|
||||||
|
if insertionPoint == -1 {
|
||||||
idx.newest = ce.nextIndex
|
idx.newest = ce.nextIndex
|
||||||
|
} else {
|
||||||
|
// The exemplar is out of order
|
||||||
|
prevEntry := ce.exemplars[insertionPoint]
|
||||||
|
idx := prevEntry.ref
|
||||||
|
oldest := ce.exemplars[idx.oldest].exemplar
|
||||||
|
if e.Ts < oldest.Ts || (e.Ts == oldest.Ts &&
|
||||||
|
(e.Value < oldest.Value || (e.Value == oldest.Value && e.Labels.Hash() < oldest.Labels.Hash()))) {
|
||||||
|
// The exemplar should be inserted before the oldest one
|
||||||
|
ce.exemplars[ce.nextIndex].next = ce.nextIndex
|
||||||
|
idx.oldest = ce.nextIndex
|
||||||
|
} else {
|
||||||
|
// The exemplar should be inserted after prevEntry
|
||||||
|
ce.exemplars[ce.nextIndex].next = prevEntry.next
|
||||||
|
prevEntry.next = ce.nextIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
|
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue