storage: add new interface to append with CT

This change does the following:
- Add an interface to append a sample with CT
- Update headAppender and initAppender to implement AppendWithCT
- Update the RefSample to include CT
- Update the scrape loop to add the CT to samples when CT is enabled

This change doesn't update the remote storage wlog watcher to make use
of the new CT feild in the RefSample, but that can be done in a
following PR.

We should compare using benchmarks how this compares to adding the CT to
the metadata (which also goes in the WAL)

Signed-off-by: Ridwan Sharif <ridwanmsharif@google.com>
This commit is contained in:
Ridwan Sharif 2024-10-24 18:19:20 +00:00
parent d0eecb1223
commit 179499523b
7 changed files with 199 additions and 54 deletions

View file

@ -1613,6 +1613,12 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
// Take an appender with limits. // Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema) app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
var appWithCT storage.AppenderWithCT
var canEncodeCT bool
if sl.enableCTZeroIngestion {
appWithCT, canEncodeCT = app.(storage.AppenderWithCT)
}
defer func() { defer func() {
if err != nil { if err != nil {
return return
@ -1726,21 +1732,23 @@ loop:
if seriesAlreadyScraped && parsedTimestamp == nil { if seriesAlreadyScraped && parsedTimestamp == nil {
err = storage.ErrDuplicateSampleForTimestamp err = storage.ErrDuplicateSampleForTimestamp
} else { } else {
var ct int64
if sl.enableCTZeroIngestion { if sl.enableCTZeroIngestion {
if ctMs := p.CreatedTimestamp(); ctMs != nil { if ctMs := p.CreatedTimestamp(); ctMs != nil {
ct = *ctMs
if isHistogram && sl.enableNativeHistogramIngestion { if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil { if h != nil {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil) ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ct, h, nil)
} else { } else {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh) ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, ct, nil, fh)
} }
} else { } else {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) ref, err = app.AppendCTZeroSample(ref, lset, t, ct)
} }
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the // CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug. // scrape on errors updating the created timestamp, log debug.
sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) sl.l.Debug("Error when appending CT in scrape loop", "series", string(met), "ct", ct, "t", t, "err", err)
} }
} }
} }
@ -1752,7 +1760,11 @@ loop:
ref, err = app.AppendHistogram(ref, lset, t, nil, fh) ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
} }
} else { } else {
ref, err = app.Append(ref, lset, t, val) if canEncodeCT && ct != 0 {
ref, err = appWithCT.AppendWithCT(ref, lset, t, val, ct)
} else {
ref, err = app.Append(ref, lset, t, val)
}
} }
} }

View file

@ -285,6 +285,12 @@ type Appender interface {
CreatedTimestampAppender CreatedTimestampAppender
} }
type AppenderWithCT interface {
Appender
AppendWithCT(ref SeriesRef, l labels.Labels, t int64, v float64, ct int64) (SeriesRef, error)
}
// GetRef is an extra interface on Appenders used by downstream projects // GetRef is an extra interface on Appenders used by downstream projects
// (e.g. Cortex) to avoid maintaining a parallel set of references. // (e.g. Cortex) to avoid maintaining a parallel set of references.
type GetRef interface { type GetRef interface {

View file

@ -46,6 +46,21 @@ func (a *initAppender) SetOptions(opts *storage.AppendOptions) {
} }
} }
func (a *initAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t int64, v float64, ct int64) (storage.SeriesRef, error) {
if a.app != nil {
appWithCT, ok := a.app.(storage.AppenderWithCT)
if ok {
return appWithCT.AppendWithCT(ref, lset, t, v, ct)
}
return a.app.Append(ref, lset, t, v)
}
a.head.initTime(t)
headAppender := a.head.appender()
a.app = headAppender
return headAppender.AppendWithCT(ref, lset, t, v, ct)
}
func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if a.app != nil { if a.app != nil {
return a.app.Append(ref, lset, t, v) return a.app.Append(ref, lset, t, v)
@ -340,6 +355,55 @@ func (a *headAppender) SetOptions(opts *storage.AppendOptions) {
} }
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return a.appendWithCT(ref, lset, t, v, 0)
}
func (a *headAppender) AppendWithCT(ref storage.SeriesRef, lset labels.Labels, t int64, v float64, ct int64) (storage.SeriesRef, error) {
return a.appendWithCT(ref, lset, t, v, ct)
}
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
}
// Check if CT wouldn't be OOO vs samples we already might have for this series.
// NOTE(bwplotka): This will be often hit as it's expected for long living
// counters to share the same CT.
s.Lock()
isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if err != nil {
return 0, err
}
if isOOO {
return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT
}
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0, CT: ct})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) appendWithCT(ref storage.SeriesRef, lset labels.Labels, t int64, v float64, ct int64) (storage.SeriesRef, error) {
// Fail fast if OOO is disabled and the sample is out of bounds. // Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise a full check will be done later to decide if the sample is in-order or out-of-order. // Otherwise a full check will be done later to decide if the sample is in-order or out-of-order.
if a.oooTimeWindow == 0 && t < a.minValidTime { if a.oooTimeWindow == 0 && t < a.minValidTime {
@ -401,57 +465,19 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
if t > a.maxt { if t > a.maxt {
a.maxt = t a.maxt = t
} }
if ct > t {
ct = t
}
a.samples = append(a.samples, record.RefSample{ a.samples = append(a.samples, record.RefSample{
Ref: s.ref, Ref: s.ref,
T: t, T: t,
V: v, V: v,
CT: ct,
}) })
a.sampleSeries = append(a.sampleSeries, s) a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
}
// Check if CT wouldn't be OOO vs samples we already might have for this series.
// NOTE(bwplotka): This will be often hit as it's expected for long living
// counters to share the same CT.
s.Lock()
isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if err != nil {
return 0, err
}
if isOOO {
return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT
}
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) { func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
// Ensure no empty labels have gotten through. // Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty() lset = lset.WithoutEmpty()

View file

@ -6501,6 +6501,85 @@ func TestHeadAppender_AppendCT(t *testing.T) {
} }
} }
func TestHeadAppender_CTinRefSample(t *testing.T) {
type appendableSamples struct {
t int64
v float64
ct int64
}
for _, tc := range []struct {
name string
appendableSamples []appendableSamples
expectedSamples []record.RefSample
}{
{
name: "sample with non-zero ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
},
},
{
name: "sample with ct > t",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 101},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 100, Ref: 1},
},
},
{
name: "multiple samples + same ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
{t: 101, v: 10, ct: 1},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
{T: 101, V: 10, CT: 1, Ref: 1},
},
},
{
name: "multiple samples + different ct",
appendableSamples: []appendableSamples{
{t: 100, v: 10, ct: 1},
{t: 102, v: 10, ct: 101},
},
expectedSamples: []record.RefSample{
{T: 100, V: 10, CT: 1, Ref: 1},
{T: 102, V: 10, CT: 101, Ref: 1},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
aWithCT, ok := a.(storage.AppenderWithCT)
require.True(t, ok)
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := aWithCT.AppendWithCT(0, lbls, sample.t, sample.v, sample.ct)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
recs := readTestWAL(t, w.Dir())
_, ok = recs[0].([]record.RefSeries)
require.True(t, ok, "expected first record to be a RefSeries")
actualType := reflect.TypeOf(recs[1])
require.Equal(t, reflect.TypeOf([]record.RefSample{}), actualType, "expected second record to be a record.RefSample")
require.Equal(t, tc.expectedSamples, recs[1])
})
}
}
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// Use a chunk range of 1 here so that if we attempted to determine if the head // Use a chunk range of 1 here so that if we attempted to determine if the head
// was compactable using default values for min and max times, `Head.compactable()` // was compactable using default values for min and max times, `Head.compactable()`

View file

@ -153,6 +153,7 @@ type RefSample struct {
Ref chunks.HeadSeriesRef Ref chunks.HeadSeriesRef
T int64 T int64
V float64 V float64
CT int64
} }
// RefMetadata is the metadata associated with a series ID. // RefMetadata is the metadata associated with a series ID.
@ -304,8 +305,9 @@ func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error)
return samples, nil return samples, nil
} }
var ( var (
baseRef = dec.Be64() baseRef = dec.Be64()
baseTime = dec.Be64int64() baseTime = dec.Be64int64()
baseCreatedTime = dec.Be64int64()
) )
// Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big. // Allow 1 byte for each varint and 8 for the value; the output slice must be at least that big.
if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize { if minSize := dec.Len() / (1 + 1 + 8); cap(samples) < minSize {
@ -314,12 +316,14 @@ func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error)
for len(dec.B) > 0 && dec.Err() == nil { for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64() dref := dec.Varint64()
dtime := dec.Varint64() dtime := dec.Varint64()
dcreatedTime := dec.Varint64()
val := dec.Be64() val := dec.Be64()
samples = append(samples, RefSample{ samples = append(samples, RefSample{
Ref: chunks.HeadSeriesRef(int64(baseRef) + dref), Ref: chunks.HeadSeriesRef(int64(baseRef) + dref),
T: baseTime + dtime, T: baseTime + dtime,
V: math.Float64frombits(val), V: math.Float64frombits(val),
CT: baseCreatedTime + dcreatedTime,
}) })
} }
@ -645,16 +649,20 @@ func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
return buf.Get() return buf.Get()
} }
// Store base timestamp and base reference number of first sample. // Store base base reference number, timestamp and created timestamp of
// All samples encode their timestamp and ref as delta to those. // first sample. All samples encode their ref, timestamp and created
// timestamp as delta to those.
// TODO(ridwanmsharif): Can the timestamp be encoded as a delta with the CT?
first := samples[0] first := samples[0]
buf.PutBE64(uint64(first.Ref)) buf.PutBE64(uint64(first.Ref))
buf.PutBE64int64(first.T) buf.PutBE64int64(first.T)
buf.PutBE64int64(first.CT)
for _, s := range samples { for _, s := range samples {
buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
buf.PutVarint64(s.T - first.T) buf.PutVarint64(s.T - first.T)
buf.PutVarint64(s.CT - first.CT)
buf.PutBE64(math.Float64bits(s.V)) buf.PutBE64(math.Float64bits(s.V))
} }
return buf.Get() return buf.Get()

View file

@ -183,6 +183,20 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.Equal(t, floatHistograms, decFloatHistograms) require.Equal(t, floatHistograms, decFloatHistograms)
} }
func TestRecord_EncodeDecodeCT(t *testing.T) {
var enc Encoder
dec := NewDecoder(labels.NewSymbolTable())
samples := []RefSample{
{Ref: 0, T: 122, V: 123, CT: 12345},
{Ref: 0, T: 123, V: 123, CT: -12345},
{Ref: 0, T: 123, V: 123, CT: 0},
}
decSamples, err := dec.Samples(enc.Samples(samples, nil), nil)
require.NoError(t, err)
require.Equal(t, samples, decSamples)
}
// TestRecord_Corrupted ensures that corrupted records return the correct error. // TestRecord_Corrupted ensures that corrupted records return the correct error.
// Bugfix check for pull/521 and pull/523. // Bugfix check for pull/521 and pull/523.
func TestRecord_Corrupted(t *testing.T) { func TestRecord_Corrupted(t *testing.T) {

View file

@ -493,7 +493,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
const segments = 1 const segments = 1
const seriesCount = 20 const seriesCount = 20
const samplesCount = 300 const samplesCount = 350
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
@ -569,8 +569,8 @@ func TestCheckpointSeriesReset(t *testing.T) {
compress CompressionType compress CompressionType
segments int segments int
}{ }{
{compress: CompressionNone, segments: 14}, {compress: CompressionNone, segments: 15},
{compress: CompressionSnappy, segments: 13}, {compress: CompressionSnappy, segments: 14},
} }
for _, tc := range testCases { for _, tc := range testCases {