implementation

Signed-off-by: Vanshikav123 <vanshikav928@gmail.com>
This commit is contained in:
Vanshikav123 2024-08-22 01:55:53 +05:30
parent 7cfe0b1567
commit 638902487e
8 changed files with 53 additions and 5 deletions

View file

@ -1587,6 +1587,11 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
type notReadyAppender struct{}
// SetHints implements storage.Appender.
func (n notReadyAppender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View file

@ -43,6 +43,10 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender {
type nopAppender struct{}
func (a nopAppender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, nil
}
@ -109,6 +113,11 @@ type collectResultAppender struct {
pendingMetadata []metadata.Metadata
}
// SetHints implements storage.Appender.
func (a *collectResultAppender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()

View file

@ -148,6 +148,9 @@ type fanoutAppender struct {
secondaries []Appender
}
func (f *fanoutAppender) SetHints(hints *AppendHints) {
panic("unimplemented")
}
func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
ref, err := f.primary.Append(ref, l, t, v)
if err != nil {

View file

@ -237,6 +237,10 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
return f(mint, maxt)
}
type AppendHints struct {
DiscardOutOfOrder bool
}
// Appender provides batched appends against a storage.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
@ -265,6 +269,9 @@ type Appender interface {
// Appender has to be discarded after rollback.
Rollback() error
// New SetHints method to set the append hints.
SetHints(hints *AppendHints)
ExemplarAppender
HistogramAppender
MetadataUpdater

View file

@ -284,6 +284,10 @@ type timestampTracker struct {
highestRecvTimestamp *maxTimestamp
}
func (t *timestampTracker) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
// Append implements storage.Appender.
func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64, _ float64) (storage.SeriesRef, error) {
t.samples++

View file

@ -822,7 +822,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
}
return m
}
func (m *mockAppendable) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if m.appendSampleErr != nil {
return 0, m.appendSampleErr

View file

@ -784,6 +784,10 @@ type appender struct {
floatHistogramSeries []*memSeries
}
func (a *appender) SetHints(hints *storage.AppendHints) {
panic("unimplemented")
}
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// series references and chunk references are identical for agent mode.
headRef := chunks.HeadSeriesRef(ref)

View file

@ -36,12 +36,17 @@ import (
// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
app storage.Appender
head *Head
app storage.Appender
head *Head
hints *storage.AppendHints
}
var _ storage.GetRef = &initAppender{}
func (a *initAppender) SetHints(hints *storage.AppendHints) {
a.hints = hints
}
func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.Append(ref, lset, t, v)
@ -318,8 +323,12 @@ type headAppender struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
hints *storage.AppendHints
}
func (a *headAppender) SetHints(hints *storage.AppendHints) {
a.hints = hints
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work.
@ -347,13 +356,18 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
}
s.Lock()
defer s.Unlock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfOrderSample
}
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}