Append created timestamps.

Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>
This commit is contained in:
Arthur Silva Sens 2023-10-12 21:34:41 -03:00
parent f216ddadbc
commit 70be156e4e
No known key found for this signature in database
15 changed files with 587 additions and 22 deletions

View file

@ -206,9 +206,15 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "native-histograms":
c.tsdb.EnableNativeHistograms = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "created-timestamp-ingestion":
c.scrape.EnableCreatedTimestampIngestion = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental created timestamp ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "":
continue
case "promql-at-modifier", "promql-negative-offset":
@ -1448,6 +1454,10 @@ func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels,
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }

View file

@ -459,7 +459,11 @@ var (
OpenMetricsText0_0_1,
PrometheusText0_0_4,
}
DefaultNativeHistogramScrapeProtocols = []ScrapeProtocol{
// DefaultProtoFirstScrapeProtocols is the set of scrape protocols that favors protobuf
// Prometheus exposition format. Used by default for certain feature-flags like
// "native-histograms" and "created-timestamp-ingestion".
DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{
PrometheusProto,
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,

View file

@ -14,10 +14,18 @@
package scrape
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
"github.com/gogo/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -50,6 +58,10 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M
return 0, nil
}
func (a nopAppender) AppendCreatedTimestamp(storage.SeriesRef, labels.Labels, int64) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
@ -65,9 +77,19 @@ type histogramSample struct {
fh *histogram.FloatHistogram
}
type collectResultAppendable struct {
*collectResultAppender
}
func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender {
return a
}
// collectResultAppender records all samples that were added through the appender.
// It can be used as its zero value or be backed by another appender it writes samples through.
type collectResultAppender struct {
mtx sync.Mutex
next storage.Appender
resultFloats []floatSample
pendingFloats []floatSample
@ -82,6 +104,8 @@ type collectResultAppender struct {
}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset,
t: t,
@ -103,6 +127,8 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
}
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingExemplars = append(a.pendingExemplars, e)
if a.next == nil {
return 0, nil
@ -112,6 +138,8 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
}
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
if a.next == nil {
return 0, nil
@ -121,6 +149,8 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingMetadata = append(a.pendingMetadata, m)
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
@ -132,7 +162,24 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
return a.next.UpdateMetadata(ref, l, m)
}
func (a *collectResultAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: l,
t: t,
f: 0.0,
})
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
return ref, nil
}
func (a *collectResultAppender) Commit() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
@ -148,6 +195,8 @@ func (a *collectResultAppender) Commit() error {
}
func (a *collectResultAppender) Rollback() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.rolledbackFloats = a.pendingFloats
a.rolledbackHistograms = a.pendingHistograms
a.pendingFloats = nil
@ -171,3 +220,20 @@ func (a *collectResultAppender) String() string {
}
return sb.String()
}
// serializeMetricFamily serializes a MetricFamily into a byte slice.
// Needed because Prometheus has its own implementation of protobuf
// marshalling and unmarshalling that only supports 'encoding=delimited'.
// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
func serializeMetricFamily(t *testing.T, mf *dto.MetricFamily) []byte {
t.Helper()
buf := &bytes.Buffer{}
protoBuf, err := proto.Marshal(mf)
require.NoError(t, err)
varintBuf := make([]byte, binary.MaxVarintLen32)
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
return buf.Bytes()
}

View file

@ -78,6 +78,8 @@ type Options struct {
EnableMetadataStorage bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable the ingestion of the created timestamp of a metric.
EnableCreatedTimestampIngestion bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption

View file

@ -15,14 +15,22 @@ package scrape
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"testing"
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/config"
@ -30,6 +38,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/util/runutil"
)
func TestPopulateLabels(t *testing.T) {
@ -714,3 +723,135 @@ scrape_configs:
reload(scrapeManager, cfg2)
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}
func TestManagerScrapeCreatedTimestamp(t *testing.T) {
counterType := dto.MetricType_COUNTER
now := time.Now()
nowMs := now.UnixMilli()
makeMfWithCT := func(ct time.Time) *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String("expected_counter"),
Type: &counterType,
Metric: []*dto.Metric{
{
Counter: &dto.Counter{
Value: proto.Float64(1.0),
CreatedTimestamp: timestamppb.New(ct),
},
},
},
}
}
for _, tc := range []struct {
name string
mf *dto.MetricFamily
ingestCT bool
expectedScrapedValues []float64
}{
{
name: "valid counter/Ingestion enabled",
mf: makeMfWithCT(now),
ingestCT: true,
expectedScrapedValues: []float64{0.0, 1.0},
},
{
name: "valid counter/Ingestion disabled",
mf: makeMfWithCT(now),
ingestCT: false,
expectedScrapedValues: []float64{1.0},
},
{
name: "created timestamp older than sample timestamp",
mf: func() *dto.MetricFamily {
mf := makeMfWithCT(now.Add(time.Hour))
mf.Metric[0].TimestampMs = &nowMs
return mf
}(),
ingestCT: true,
expectedScrapedValues: []float64{1.0},
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{EnableCreatedTimestampIngestion: tc.ingestCT},
log.NewLogfmtLogger(os.Stderr),
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
w.Write(serializeMetricFamily(t, tc.mf))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
})
scrapeManager.reload()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
if countFloatSamples(app, *tc.mf.Name) < 1 {
return errors.New("expected at least one sample")
}
return nil
}), "after 5 seconds")
scrapeManager.Stop()
require.Equal(t, tc.expectedScrapedValues, getResultFloats(app, *tc.mf.Name))
})
}
}
func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, f := range a.resultFloats {
if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
count++
}
}
return count
}
func getResultFloats(app *collectResultAppender, expectedMetricName string) (result []float64) {
app.mtx.Lock()
defer app.mtx.Unlock()
for _, f := range app.resultFloats {
if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
result = append(result, f.f)
}
}
return result
}

View file

@ -31,6 +31,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/types"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
@ -168,6 +169,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.interval,
opts.timeout,
opts.scrapeClassicHistograms,
options.EnableCreatedTimestampIngestion,
options.ExtraMetrics,
options.EnableMetadataStorage,
opts.target,
@ -787,6 +789,7 @@ type scrapeLoop struct {
interval time.Duration
timeout time.Duration
scrapeClassicHistograms bool
scrapeCreatedTimestamps bool
appender func(ctx context.Context) storage.Appender
sampleMutator labelsMutator
@ -1076,6 +1079,7 @@ func newScrapeLoop(ctx context.Context,
interval time.Duration,
timeout time.Duration,
scrapeClassicHistograms bool,
scrapeCreatedTimestamps bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
@ -1124,6 +1128,7 @@ func newScrapeLoop(ctx context.Context,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
scrapeCreatedTimestamps: scrapeCreatedTimestamps,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
@ -1557,6 +1562,18 @@ loop:
updateMetadata(lset, true)
}
if sl.scrapeCreatedTimestamps {
var ct types.Timestamp
if p.CreatedTimestamp(&ct) {
if ctMs := (ct.Seconds * 1000) + int64(ct.Nanos/1_000_000); ctMs < t {
ref, err = app.AppendCreatedTimestamp(ref, lset, ctMs)
if err != nil {
level.Debug(sl.l).Log("msg", "created timestamp not ingested", "reason", err)
}
}
}
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)

View file

@ -660,6 +660,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
false,
false,
false,
false,
nil,
false,
newTestScrapeMetrics(t),
@ -801,6 +802,7 @@ func TestScrapeLoopRun(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -945,6 +947,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
false,
false,
false,
false,
nil,
false,
scrapeMetrics,
@ -2377,7 +2380,7 @@ func TestTargetScraperScrapeOK(t *testing.T) {
runTest(acceptHeader(config.DefaultScrapeProtocols))
protobufParsing = true
runTest(acceptHeader(config.DefaultNativeHistogramScrapeProtocols))
runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols))
}
func TestTargetScrapeScrapeCancel(t *testing.T) {

View file

@ -202,6 +202,20 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada
return ref, nil
}
func (f *fanoutAppender) AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) {
ref, err := f.primary.AppendCreatedTimestamp(ref, l, t)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendCreatedTimestamp(ref, l, t); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) Commit() (err error) {
err = f.primary.Commit()

View file

@ -237,6 +237,7 @@ type Appender interface {
ExemplarAppender
HistogramAppender
MetadataUpdater
CreatedTimestampAppender
}
// GetRef is an extra interface on Appenders used by downstream projects
@ -294,6 +295,20 @@ type MetadataUpdater interface {
UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error)
}
// CreatedTimestampAppender provides an interface for appending created timestamps to the storage.
type CreatedTimestampAppender interface {
// AppendCreatedTimestamp adds an extra sample to the given series labels.
// The value of the appended sample is always zero, while the sample's timestamp
// is the one exposed by the target as created timestamp.
//
// Appending created timestamps is optional, that is because appending sythetic zeros
// should only happen if created timestamp respects the order of the samples, i.e. is not out-of-order.
//
// When AppendCreatedTimestamp decides to not append a sample, it should return a warning that can be
// logged by the caller.
AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error)
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool

View file

@ -303,6 +303,11 @@ func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}
func (t *timestampTracker) AppendCreatedTimestamp(_ storage.SeriesRef, _ labels.Labels, _ int64) (storage.SeriesRef, error) {
// AppendCreatedTimestamp is no-op for remote-write for now.
return 0, nil
}
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)

View file

@ -339,3 +339,8 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _
// UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now.
return 0, nil
}
func (m *mockAppendable) AppendCreatedTimestamp(_ storage.SeriesRef, _ labels.Labels, _ int64) (storage.SeriesRef, error) {
// AppendCreatedTimestamp is no-op for remote-write for now.
return 0, nil
}

View file

@ -954,6 +954,11 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met
return 0, nil
}
// AppendCreatedTimestamp wasn't implemented for agent mode, yet.
func (a *appender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) {
return 0, nil
}
// Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error {
if err := a.log(); err != nil {

View file

@ -87,6 +87,17 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m
return a.app.UpdateMetadata(ref, l, m)
}
func (a *initAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendCreatedTimestamp(ref, lset, t)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendCreatedTimestamp(ref, lset, t)
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL.
func (h *Head) initTime(t int64) {
@ -319,28 +330,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
var created bool
var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
s, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
}
if value.IsStaleNaN(v) {
@ -389,6 +383,70 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
return storage.SeriesRef(s.ref), nil
}
// AppendCreatedTimestamp appends a sample with 0 as its value when it makes sense to do so.
// For instance, it's not safe or efficient to append out-of-order created
// timestamp (e.g. we don't know if we didn't append zero for this created timestamp already).
func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) {
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
}
s.Lock()
isOOO, _, err := s.appendable(t, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if err != nil {
return 0, err
}
if isOOO {
return storage.SeriesRef(s.ref), nil
}
if t > a.maxt {
a.maxt = t
}
a.samples = append(a.samples, record.RefSample{
Ref: s.ref,
T: t,
V: 0.0,
})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return nil, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return nil, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
var created bool
var err error
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
if err != nil {
return nil, err
}
if created {
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
return s, nil
}
// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.

View file

@ -5641,3 +5641,186 @@ func TestPostingsCardinalityStats(t *testing.T) {
// Using cache.
require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1))
}
func TestAppendCreatedTimestamps(t *testing.T) {
testCases := []struct {
name string
appendFunc func(*testing.T, storage.Appender)
assertFunc func(*testing.T, storage.Querier)
}{
{
name: "In order ct+normal sample",
appendFunc: func(t *testing.T, a storage.Appender) {
lbls := labels.FromStrings("foo", "bar")
ts := int64(100)
_, err := a.AppendCreatedTimestamp(0, lbls, ts-1)
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, 10)
require.NoError(t, err)
require.NoError(t, a.Commit())
},
assertFunc: func(t *testing.T, q storage.Querier) {
ts := int64(100)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, ts-1, timestamp)
require.Equal(t, 0.0, value)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, ts, timestamp)
require.Equal(t, 10.0, value)
require.False(t, ss.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
},
},
{
name: "Consecutive appends with same ct ignore ct",
appendFunc: func(t *testing.T, a storage.Appender) {
lbls := labels.FromStrings("foo", "bar")
ctTs := int64(99)
sampleTs := int64(100)
_, err := a.AppendCreatedTimestamp(0, lbls, ctTs)
require.NoError(t, err)
_, err = a.Append(0, lbls, sampleTs, 10)
require.NoError(t, err)
sampleTs += 1
_, err = a.AppendCreatedTimestamp(0, lbls, ctTs)
require.NoError(t, err)
_, err = a.Append(0, lbls, sampleTs, 10)
require.NoError(t, err)
require.NoError(t, a.Commit())
},
assertFunc: func(t *testing.T, q storage.Querier) {
ctTs := int64(99)
sampleTs := int64(100)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator(nil)
// First CT is ingested
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, ctTs, timestamp)
require.Equal(t, 0.0, value)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, sampleTs, timestamp)
require.Equal(t, 10.0, value)
// On a consecutive scrape with the same CT, the CT is ignored
sampleTs += 1
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, sampleTs, timestamp)
require.Equal(t, 10.0, value)
require.False(t, ss.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
},
},
{
name: "Consecutive appends with newer ct do not ignore ct",
appendFunc: func(t *testing.T, a storage.Appender) {
lbls := labels.FromStrings("foo", "bar")
ctTs := int64(99)
sampleTs := int64(100)
_, err := a.AppendCreatedTimestamp(0, lbls, ctTs)
require.NoError(t, err)
_, err = a.Append(0, lbls, sampleTs, 10)
require.NoError(t, err)
ctTs = sampleTs + 1
sampleTs = ctTs + 1
_, err = a.AppendCreatedTimestamp(0, lbls, ctTs)
require.NoError(t, err)
_, err = a.Append(0, lbls, sampleTs, 10)
require.NoError(t, err)
require.NoError(t, a.Commit())
},
assertFunc: func(t *testing.T, q storage.Querier) {
ctTs := int64(99)
sampleTs := int64(100)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, ctTs, timestamp)
require.Equal(t, 0.0, value)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, sampleTs, timestamp)
require.Equal(t, 10.0, value)
// Second CT is younger than previous sample, so it is not ignored
ctTs = sampleTs + 1
sampleTs = ctTs + 1
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, ctTs, timestamp)
require.Equal(t, 0.0, value)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value = it.At()
require.Equal(t, sampleTs, timestamp)
require.Equal(t, 10.0, value)
require.False(t, ss.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
},
},
{
name: "CT equals to previous sample timestamp is ignored",
appendFunc: func(t *testing.T, a storage.Appender) {
lbls := labels.FromStrings("foo", "bar")
sampleTs := int64(100)
_, err := a.Append(0, lbls, sampleTs, 10)
require.NoError(t, err)
_, err = a.AppendCreatedTimestamp(0, lbls, sampleTs)
require.NoError(t, err)
require.NoError(t, a.Commit())
},
assertFunc: func(t *testing.T, q storage.Querier) {
sampleTs := int64(100)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, sampleTs, timestamp)
require.Equal(t, 10.0, value)
require.False(t, ss.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
},
},
}
for _, tc := range testCases {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
tc.appendFunc(t, a)
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
tc.assertFunc(t, q)
}
}

37
util/runutil/runutil.go Normal file
View file

@ -0,0 +1,37 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copied from https://github.com/efficientgo/core/blob/a21078e2c723b69e05f95c65dbc5058712b4edd8/runutil/runutil.go#L39
// and adjusted.
package runutil
import "time"
// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
tick := time.NewTicker(interval)
defer tick.Stop()
var err error
for {
if err = f(); err == nil {
return nil
}
select {
case <-stopc:
return err
case <-tick.C:
}
}
}