From 8cba4f811389a9e49024f248b37690078605d0b1 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 14 Feb 2024 10:24:58 +0100 Subject: [PATCH] tsdb: Add support for out-of-order exemplars Signed-off-by: Arve Knudsen --- tsdb/exemplar.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 7545ab9a6..7306d5845 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "errors" + "fmt" "slices" "sync" "unicode/utf8" @@ -273,6 +274,42 @@ func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar. return nil } +// outOfOrderInsertionPoint determines the insertion point for an out of order exemplar. +// If it should instead turn out to be a duplicate, an error is returned. +func (ce *CircularExemplarStorage) outOfOrderInsertionPoint(key []byte, e exemplar.Exemplar) (int, error) { + idx := ce.index[string(key)] + + oldest := ce.exemplars[idx.oldest].exemplar + if e.Ts < oldest.Ts || (e.Ts == oldest.Ts && + (e.Value < oldest.Value || (e.Value == oldest.Value && e.Labels.Hash() < oldest.Labels.Hash()))) { + // This is an out of order exemplar preceding the up till now oldest exemplar of the series + return idx.oldest, nil + } + + i := idx.oldest + // Search for insertion point + for { + current := ce.exemplars[i] + next := ce.exemplars[current.next].exemplar + if e.Equals(current.exemplar) || e.Equals(next) { + return -1, storage.ErrDuplicateExemplar + } + + if e.Ts < next.Ts || (e.Ts == next.Ts && + (e.Value < next.Value || (e.Value == next.Value && e.Labels.Hash() < next.Labels.Hash()))) { + // Found our insertion point + return i, nil + } + + i = current.next + if i == noExemplar { + break + } + } + + return -1, fmt.Errorf("could not find out of order exemplar insertion point") +} + // Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. // Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed. func (ce *CircularExemplarStorage) Resize(l int64) int { @@ -361,6 +398,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp ce.lock.Lock() defer ce.lock.Unlock() + insertionPoint := -1 idx, ok := ce.index[string(seriesLabels)] err := ce.validateExemplar(idx, e, true) if err != nil { @@ -368,7 +406,14 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp // Duplicate exemplar, noop. return nil } - return err + if errors.Is(err, storage.ErrOutOfOrderExemplar) { + insertionPoint, err = ce.outOfOrderInsertionPoint(seriesLabels, e) + if err != nil { + return err + } + } else { + return err + } } if !ok { @@ -396,7 +441,25 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp ce.exemplars[ce.nextIndex].next = noExemplar ce.exemplars[ce.nextIndex].exemplar = e ce.exemplars[ce.nextIndex].ref = idx + // Only set newest if not an out of order exemplar + if insertionPoint == -1 { idx.newest = ce.nextIndex + } else { + // The exemplar is out of order + prevEntry := ce.exemplars[insertionPoint] + idx := prevEntry.ref + oldest := ce.exemplars[idx.oldest].exemplar + if e.Ts < oldest.Ts || (e.Ts == oldest.Ts && + (e.Value < oldest.Value || (e.Value == oldest.Value && e.Labels.Hash() < oldest.Labels.Hash()))) { + // The exemplar should be inserted before the oldest one + ce.exemplars[ce.nextIndex].next = ce.nextIndex + idx.oldest = ce.nextIndex + } else { + // The exemplar should be inserted after prevEntry + ce.exemplars[ce.nextIndex].next = prevEntry.next + prevEntry.next = ce.nextIndex + } + } ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)