mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Combine Appender.Add and AddFast into a single Append method. (#8489)
This moves the label lookup into TSDB, whilst still keeping the cached-ref optimisation for repeated Appends. This makes the API easier to consume and implement. In particular this change is motivated by the scrape-time-aggregation work, which I don't think is possible to implement without it as it needs access to label values. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
404d85f7a8
commit
7369561305
|
@ -1105,12 +1105,10 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
|
|||
|
||||
type notReadyAppender struct{}
|
||||
|
||||
func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (n notReadyAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return tsdb.ErrNotReady }
|
||||
|
||||
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
|
||||
|
||||
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
|
|
@ -265,11 +265,11 @@ func TestTimeMetrics(t *testing.T) {
|
|||
))
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1)
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp
|
|||
l := labels.Labels{}
|
||||
p.Metric(&l)
|
||||
|
||||
if _, err := app.Add(l, *ts, v); err != nil {
|
||||
if _, err := app.Append(0, l, *ts, v); err != nil {
|
||||
return errors.Wrap(err, "add sample")
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
|
@ -206,25 +205,19 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in
|
|||
for _, s := range scrape {
|
||||
s.value += 1000
|
||||
|
||||
if s.ref == nil {
|
||||
ref, err := app.Add(s.labels, ts, float64(s.value))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.ref = &ref
|
||||
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
||||
|
||||
if errors.Cause(err) != storage.ErrNotFound {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ref, err := app.Add(s.labels, ts, float64(s.value))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.ref = &ref
|
||||
var ref uint64
|
||||
if s.ref != nil {
|
||||
ref = *s.ref
|
||||
}
|
||||
|
||||
ref, err := app.Append(ref, s.labels, ts, float64(s.value))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if s.ref == nil {
|
||||
s.ref = &ref
|
||||
}
|
||||
total++
|
||||
}
|
||||
if err := app.Commit(); err != nil {
|
||||
|
|
|
@ -71,10 +71,8 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
a := storage.Appender(context.Background())
|
||||
ts := int64(s * 10000) // 10s interval.
|
||||
for i, metric := range metrics {
|
||||
err := a.AddFast(refs[i], ts, float64(s))
|
||||
if err != nil {
|
||||
refs[i], _ = a.Add(metric, ts, float64(s))
|
||||
}
|
||||
ref, _ := a.Append(refs[i], metric, ts, float64(s))
|
||||
refs[i] = ref
|
||||
}
|
||||
if err := a.Commit(); err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
|
@ -904,14 +904,15 @@ load 1ms
|
|||
// Add some samples with negative timestamp.
|
||||
db := test.TSDB()
|
||||
app := db.Appender(context.Background())
|
||||
ref, err := app.Add(lblsneg, -1000000, 1000)
|
||||
ref, err := app.Append(0, lblsneg, -1000000, 1000)
|
||||
require.NoError(t, err)
|
||||
for ts := int64(-1000000 + 1000); ts <= 0; ts += 1000 {
|
||||
require.NoError(t, app.AddFast(ref, ts, -float64(ts/1000)+1))
|
||||
_, err := app.Append(ref, nil, ts, -float64(ts/1000)+1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// To test the fix for https://github.com/prometheus/prometheus/issues/8433.
|
||||
_, err = app.Add(labels.FromStrings("__name__", "metric_timestamp"), 3600*1000, 1000)
|
||||
_, err = app.Append(0, labels.FromStrings("__name__", "metric_timestamp"), 3600*1000, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
|
|
@ -43,8 +43,8 @@ func TestDeriv(t *testing.T) {
|
|||
a := storage.Appender(context.Background())
|
||||
|
||||
metric := labels.FromStrings("__name__", "foo")
|
||||
a.Add(metric, 1493712816939, 1.0)
|
||||
a.Add(metric, 1493712846939, 1.0)
|
||||
a.Append(0, metric, 1493712816939, 1.0)
|
||||
a.Append(0, metric, 1493712846939, 1.0)
|
||||
|
||||
require.NoError(t, a.Commit())
|
||||
|
||||
|
|
|
@ -307,7 +307,7 @@ func (cmd *loadCmd) append(a storage.Appender) error {
|
|||
m := cmd.metrics[h]
|
||||
|
||||
for _, s := range smpls {
|
||||
if _, err := a.Add(m, s.T, s.V); err != nil {
|
||||
if _, err := a.Append(0, m, s.T, s.V); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -732,7 +732,7 @@ func (ll *LazyLoader) appendTill(ts int64) error {
|
|||
ll.loadCmd.defs[h] = smpls[i:]
|
||||
break
|
||||
}
|
||||
if _, err := app.Add(m, s.T, s.V); err != nil {
|
||||
if _, err := app.Append(0, m, s.T, s.V); err != nil {
|
||||
return err
|
||||
}
|
||||
if i == len(smpls)-1 {
|
||||
|
|
|
@ -622,7 +622,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
g.seriesInPreviousEval[i] = seriesReturned
|
||||
}()
|
||||
for _, s := range vector {
|
||||
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
|
||||
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
|
||||
switch errors.Cause(err) {
|
||||
case storage.ErrOutOfOrderSample:
|
||||
numOutOfOrder++
|
||||
|
@ -647,7 +647,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
for metric, lset := range g.seriesInPreviousEval[i] {
|
||||
if _, ok := seriesReturned[metric]; !ok {
|
||||
// Series no longer exposed, mark it stale.
|
||||
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
|
@ -673,7 +673,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
|
|||
app := g.opts.Appendable.Appender(ctx)
|
||||
for _, s := range g.staleSeries {
|
||||
// Rule that produced series no longer configured, mark it stale.
|
||||
_, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
_, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
|
|
|
@ -547,9 +547,9 @@ func TestStaleness(t *testing.T) {
|
|||
|
||||
// A time series that has two samples and then goes stale.
|
||||
app := st.Appender(context.Background())
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
|
||||
|
||||
err = app.Commit()
|
||||
require.NoError(t, err)
|
||||
|
@ -872,10 +872,10 @@ func TestNotify(t *testing.T) {
|
|||
})
|
||||
|
||||
app := storage.Appender(context.Background())
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 6000, 0)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3)
|
||||
app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 6000, 0)
|
||||
|
||||
err = app.Commit()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -28,10 +28,9 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender {
|
|||
|
||||
type nopAppender struct{}
|
||||
|
||||
func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil }
|
||||
func (a nopAppender) AddFast(uint64, int64, float64) error { return nil }
|
||||
func (a nopAppender) Commit() error { return nil }
|
||||
func (a nopAppender) Rollback() error { return nil }
|
||||
func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, error) { return 0, nil }
|
||||
func (a nopAppender) Commit() error { return nil }
|
||||
func (a nopAppender) Rollback() error { return nil }
|
||||
|
||||
type sample struct {
|
||||
metric labels.Labels
|
||||
|
@ -46,47 +45,25 @@ type collectResultAppender struct {
|
|||
result []sample
|
||||
pendingResult []sample
|
||||
rolledbackResult []sample
|
||||
|
||||
mapper map[uint64]labels.Labels
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if a.next == nil {
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
|
||||
err := a.next.AddFast(ref, t, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
a.pendingResult = append(a.pendingResult, sample{
|
||||
metric: a.mapper[ref],
|
||||
metric: lset,
|
||||
t: t,
|
||||
v: v,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
|
||||
a.pendingResult = append(a.pendingResult, sample{
|
||||
metric: m,
|
||||
t: t,
|
||||
v: v,
|
||||
})
|
||||
if a.next == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if a.mapper == nil {
|
||||
a.mapper = map[uint64]labels.Labels{}
|
||||
}
|
||||
|
||||
ref, err := a.next.Add(m, t, v)
|
||||
ref, err := a.next.Append(ref, lset, t, v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
a.mapper[ref] = m
|
||||
return ref, nil
|
||||
|
||||
return ref, err
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Commit() error {
|
||||
|
|
|
@ -1306,20 +1306,19 @@ loop:
|
|||
continue
|
||||
}
|
||||
ce, ok := sl.cache.get(yoloString(met))
|
||||
var (
|
||||
ref uint64
|
||||
lset labels.Labels
|
||||
mets string
|
||||
hash uint64
|
||||
)
|
||||
|
||||
if ok {
|
||||
err = app.AddFast(ce.ref, t, v)
|
||||
_, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
|
||||
// In theory this should never happen.
|
||||
if err == storage.ErrNotFound {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
var lset labels.Labels
|
||||
|
||||
mets := p.Metric(&lset)
|
||||
hash := lset.Hash()
|
||||
ref = ce.ref
|
||||
lset = ce.lset
|
||||
} else {
|
||||
mets = p.Metric(&lset)
|
||||
hash = lset.Hash()
|
||||
|
||||
// Hash label set as it is seen local to the target. Then add target labels
|
||||
// and relabeling and store the final label set.
|
||||
|
@ -1335,17 +1334,18 @@ loop:
|
|||
err = errNameLabelMandatory
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
var ref uint64
|
||||
ref, err = app.Add(lset, t, v)
|
||||
sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, &appErrs)
|
||||
if err != nil {
|
||||
if err != storage.ErrNotFound {
|
||||
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
|
||||
}
|
||||
break loop
|
||||
ref, err = app.Append(ref, lset, t, v)
|
||||
sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
|
||||
if err != nil {
|
||||
if err != storage.ErrNotFound {
|
||||
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
|
||||
if !ok {
|
||||
if tp == nil {
|
||||
// Bypass staleness logic if there is an explicit timestamp.
|
||||
sl.cache.trackStaleness(hash, lset)
|
||||
|
@ -1380,7 +1380,7 @@ loop:
|
|||
if err == nil {
|
||||
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
||||
// Series no longer exposed, mark it stale.
|
||||
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
switch errors.Cause(err) {
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not count these in logging, as this is expected if a target
|
||||
|
@ -1497,37 +1497,31 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
|
|||
|
||||
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
||||
ce, ok := sl.cache.get(s)
|
||||
var ref uint64
|
||||
var lset labels.Labels
|
||||
if ok {
|
||||
err := app.AddFast(ce.ref, t, v)
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
return nil
|
||||
case storage.ErrNotFound:
|
||||
// Try an Add.
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not log here, as this is expected if a target goes away and comes back
|
||||
// again with a new scrape loop.
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
ref = ce.ref
|
||||
lset = ce.lset
|
||||
} else {
|
||||
lset = labels.Labels{
|
||||
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics in the cache.
|
||||
// We have to drop it when building the actual metric.
|
||||
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
||||
}
|
||||
}
|
||||
lset := labels.Labels{
|
||||
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics in the cache.
|
||||
// We have to drop it when building the actual metric.
|
||||
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
||||
lset = sl.reportSampleMutator(lset)
|
||||
}
|
||||
|
||||
hash := lset.Hash()
|
||||
lset = sl.reportSampleMutator(lset)
|
||||
|
||||
ref, err := app.Add(lset, t, v)
|
||||
ref, err := app.Append(ref, lset, t, v)
|
||||
switch errors.Cause(err) {
|
||||
case nil:
|
||||
sl.cache.addRef(s, ref, lset, hash)
|
||||
if !ok {
|
||||
sl.cache.addRef(s, ref, lset, lset.Hash())
|
||||
}
|
||||
return nil
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Do not log here, as this is expected if a target goes away and comes back
|
||||
// again with a new scrape loop.
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
|
|
|
@ -1560,7 +1560,7 @@ type errorAppender struct {
|
|||
collectResultAppender
|
||||
}
|
||||
|
||||
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (app *errorAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
switch lset.Get(model.MetricNameLabel) {
|
||||
case "out_of_order":
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
|
@ -1569,14 +1569,10 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, e
|
|||
case "out_of_bounds":
|
||||
return 0, storage.ErrOutOfBounds
|
||||
default:
|
||||
return app.collectResultAppender.Add(lset, t, v)
|
||||
return app.collectResultAppender.Append(ref, lset, t, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
return app.collectResultAppender.AddFast(ref, t, v)
|
||||
}
|
||||
|
||||
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
|
||||
app := &errorAppender{}
|
||||
|
||||
|
|
|
@ -290,57 +290,38 @@ type limitAppender struct {
|
|||
i int
|
||||
}
|
||||
|
||||
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (app *limitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if !value.IsStaleNaN(v) {
|
||||
app.i++
|
||||
if app.i > app.limit {
|
||||
return 0, errSampleLimit
|
||||
}
|
||||
}
|
||||
ref, err := app.Appender.Add(lset, t, v)
|
||||
ref, err := app.Appender.Append(ref, lset, t, v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if !value.IsStaleNaN(v) {
|
||||
app.i++
|
||||
if app.i > app.limit {
|
||||
return errSampleLimit
|
||||
}
|
||||
}
|
||||
err := app.Appender.AddFast(ref, t, v)
|
||||
return err
|
||||
}
|
||||
|
||||
type timeLimitAppender struct {
|
||||
storage.Appender
|
||||
|
||||
maxTime int64
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (app *timeLimitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t > app.maxTime {
|
||||
return 0, storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
ref, err := app.Appender.Add(lset, t, v)
|
||||
ref, err := app.Appender.Append(ref, lset, t, v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if t > app.maxTime {
|
||||
return storage.ErrOutOfBounds
|
||||
}
|
||||
err := app.Appender.AddFast(ref, t, v)
|
||||
return err
|
||||
}
|
||||
|
||||
// populateLabels builds a label set from the given label set and scrape configuration.
|
||||
// It returns a label set before relabeling was applied as the second return value.
|
||||
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
|
||||
|
|
|
@ -143,33 +143,20 @@ type fanoutAppender struct {
|
|||
secondaries []Appender
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
ref, err := f.primary.Add(l, t, v)
|
||||
func (f *fanoutAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
ref, err := f.primary.Append(ref, l, t, v)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Add(l, t, v); err != nil {
|
||||
if _, err := appender.Append(ref, l, t, v); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if err := f.primary.AddFast(ref, t, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if err := appender.AddFast(ref, t, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) Commit() (err error) {
|
||||
err = f.primary.Commit()
|
||||
|
||||
|
|
|
@ -36,11 +36,11 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
priStorage := teststorage.New(t)
|
||||
defer priStorage.Close()
|
||||
app1 := priStorage.Appender(ctx)
|
||||
app1.Add(inputLabel, 0, 0)
|
||||
app1.Append(0, inputLabel, 0, 0)
|
||||
inputTotalSize++
|
||||
app1.Add(inputLabel, 1000, 1)
|
||||
app1.Append(0, inputLabel, 1000, 1)
|
||||
inputTotalSize++
|
||||
app1.Add(inputLabel, 2000, 2)
|
||||
app1.Append(0, inputLabel, 2000, 2)
|
||||
inputTotalSize++
|
||||
err := app1.Commit()
|
||||
require.NoError(t, err)
|
||||
|
@ -48,11 +48,11 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
remoteStorage1 := teststorage.New(t)
|
||||
defer remoteStorage1.Close()
|
||||
app2 := remoteStorage1.Appender(ctx)
|
||||
app2.Add(inputLabel, 3000, 3)
|
||||
app2.Append(0, inputLabel, 3000, 3)
|
||||
inputTotalSize++
|
||||
app2.Add(inputLabel, 4000, 4)
|
||||
app2.Append(0, inputLabel, 4000, 4)
|
||||
inputTotalSize++
|
||||
app2.Add(inputLabel, 5000, 5)
|
||||
app2.Append(0, inputLabel, 5000, 5)
|
||||
inputTotalSize++
|
||||
err = app2.Commit()
|
||||
require.NoError(t, err)
|
||||
|
@ -61,11 +61,11 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
defer remoteStorage2.Close()
|
||||
|
||||
app3 := remoteStorage2.Appender(ctx)
|
||||
app3.Add(inputLabel, 6000, 6)
|
||||
app3.Append(0, inputLabel, 6000, 6)
|
||||
inputTotalSize++
|
||||
app3.Add(inputLabel, 7000, 7)
|
||||
app3.Append(0, inputLabel, 7000, 7)
|
||||
inputTotalSize++
|
||||
app3.Add(inputLabel, 8000, 8)
|
||||
app3.Append(0, inputLabel, 8000, 8)
|
||||
inputTotalSize++
|
||||
|
||||
err = app3.Commit()
|
||||
|
|
|
@ -136,18 +136,15 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier,
|
|||
//
|
||||
// Operations on the Appender interface are not goroutine-safe.
|
||||
type Appender interface {
|
||||
// Add adds a sample pair for the given series. A reference number is
|
||||
// returned which can be used to add further samples in the same or later
|
||||
// transactions.
|
||||
// Append adds a sample pair for the given series.
|
||||
// An optional reference number can be provided to accelerate calls.
|
||||
// A reference number is returned which can be used to add further
|
||||
// samples in the same or later transactions.
|
||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||
// to AddFast() at any point. Adding the sample via Add() returns a new
|
||||
// to Append() at any point. Adding the sample via Append() returns a new
|
||||
// reference number.
|
||||
// If the reference is 0 it must not be used for caching.
|
||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
||||
|
||||
// AddFast adds a sample pair for the referenced series. It is generally
|
||||
// faster than adding a sample by providing its full label set.
|
||||
AddFast(ref uint64, t int64, v float64) error
|
||||
Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error)
|
||||
|
||||
// Commit submits the collected samples and purges the batch. If Commit
|
||||
// returns a non-nil error, it also rolls back all modifications made in
|
||||
|
|
|
@ -212,8 +212,8 @@ type timestampTracker struct {
|
|||
highestRecvTimestamp *maxTimestamp
|
||||
}
|
||||
|
||||
// Add implements storage.Appender.
|
||||
func (t *timestampTracker) Add(_ labels.Labels, ts int64, _ float64) (uint64, error) {
|
||||
// Append implements storage.Appender.
|
||||
func (t *timestampTracker) Append(_ uint64, _ labels.Labels, ts int64, _ float64) (uint64, error) {
|
||||
t.samples++
|
||||
if ts > t.highestTimestamp {
|
||||
t.highestTimestamp = ts
|
||||
|
@ -221,12 +221,6 @@ func (t *timestampTracker) Add(_ labels.Labels, ts int64, _ float64) (uint64, er
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
// AddFast implements storage.Appender.
|
||||
func (t *timestampTracker) AddFast(_ uint64, ts int64, v float64) error {
|
||||
_, err := t.Add(nil, ts, v)
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit implements storage.Appender.
|
||||
func (t *timestampTracker) Commit() error {
|
||||
t.writeStorage.samplesIn.incr(t.samples)
|
||||
|
|
|
@ -75,7 +75,7 @@ func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err erro
|
|||
for _, ts := range req.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
for _, s := range ts.Samples {
|
||||
_, err = app.Add(labels, s.Timestamp, s.Value)
|
||||
_, err = app.Append(0, labels, s.Timestamp, s.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
|
|||
return m
|
||||
}
|
||||
|
||||
func (m *mockAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t < m.latest {
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
@ -129,10 +129,6 @@ func (m *mockAppendable) Commit() error {
|
|||
return m.commitErr
|
||||
}
|
||||
|
||||
func (*mockAppendable) AddFast(uint64, int64, float64) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (*mockAppendable) Rollback() error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
|
|
@ -448,15 +448,10 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str
|
|||
for _, s := range series {
|
||||
ref := uint64(0)
|
||||
it := s.Iterator()
|
||||
lset := s.Labels()
|
||||
for it.Next() {
|
||||
t, v := it.At()
|
||||
if ref != 0 {
|
||||
err := app.AddFast(ref, t, v)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
ref, err = app.Add(s.Labels(), t, v)
|
||||
ref, err = app.Append(ref, lset, t, v)
|
||||
require.NoError(tb, err)
|
||||
}
|
||||
require.NoError(tb, it.Err())
|
||||
|
|
|
@ -43,10 +43,10 @@ func TestBlockWriter(t *testing.T) {
|
|||
// Add some series.
|
||||
app := w.Appender(ctx)
|
||||
ts1, v1 := int64(44), float64(7)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts1, v1)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts1, v1)
|
||||
require.NoError(t, err)
|
||||
ts2, v2 := int64(55), float64(12)
|
||||
_, err = app.Add(labels.Labels{{Name: "c", Value: "d"}}, ts2, v2)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "c", Value: "d"}}, ts2, v2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
id, err := w.Flush(ctx)
|
||||
|
|
|
@ -1101,7 +1101,7 @@ func BenchmarkCompactionFromHead(b *testing.B) {
|
|||
for ln := 0; ln < labelNames; ln++ {
|
||||
app := h.Appender(context.Background())
|
||||
for lv := 0; lv < labelValues; lv++ {
|
||||
app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
|
||||
app.Append(0, labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
|
||||
}
|
||||
require.NoError(b, app.Commit())
|
||||
}
|
||||
|
@ -1134,9 +1134,9 @@ func TestDisableAutoCompactions(t *testing.T) {
|
|||
db.DisableCompactions()
|
||||
app := db.Appender(context.Background())
|
||||
for i := int64(0); i < 3; i++ {
|
||||
_, err := app.Add(label, i*blockRange, 0)
|
||||
_, err := app.Append(0, label, i*blockRange, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(label, i*blockRange+1000, 0)
|
||||
_, err = app.Append(0, label, i*blockRange+1000, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1249,11 +1249,11 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
|
|||
|
||||
// Add some data to the head that is enough to trigger a compaction.
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Add(defaultLabel, 1, 0)
|
||||
_, err := app.Append(0, defaultLabel, 1, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, 2, 0)
|
||||
_, err = app.Append(0, defaultLabel, 2, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||
_, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
|
143
tsdb/db_test.go
143
tsdb/db_test.go
|
@ -173,7 +173,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
querier, err := db.Querier(context.TODO(), 0, 1)
|
||||
|
@ -206,7 +206,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
|
|||
{
|
||||
for {
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), maxt, 0)
|
||||
expSamples = append(expSamples, sample{t: maxt, v: 0})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -261,7 +261,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
|
|||
}()
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = app.Rollback()
|
||||
|
@ -285,12 +285,13 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
app1 := db.Appender(ctx)
|
||||
|
||||
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
|
||||
ref1, err := app1.Append(0, labels.FromStrings("a", "b"), 123, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reference should already work before commit.
|
||||
err = app1.AddFast(ref1, 124, 1)
|
||||
ref2, err := app1.Append(ref1, nil, 124, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ref1, ref2)
|
||||
|
||||
err = app1.Commit()
|
||||
require.NoError(t, err)
|
||||
|
@ -298,20 +299,22 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
app2 := db.Appender(ctx)
|
||||
|
||||
// first ref should already work in next transaction.
|
||||
err = app2.AddFast(ref1, 125, 0)
|
||||
ref3, err := app2.Append(ref1, nil, 125, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ref1, ref3)
|
||||
|
||||
ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1)
|
||||
ref4, err := app2.Append(ref1, labels.FromStrings("a", "b"), 133, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, ref1, ref2)
|
||||
require.Equal(t, ref1, ref4)
|
||||
|
||||
// Reference must be valid to add another sample.
|
||||
err = app2.AddFast(ref2, 143, 2)
|
||||
ref5, err := app2.Append(ref2, nil, 143, 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ref1, ref5)
|
||||
|
||||
err = app2.AddFast(9999999, 1, 1)
|
||||
require.Equal(t, storage.ErrNotFound, errors.Cause(err))
|
||||
// Missing labels & invalid refs should fail.
|
||||
_, err = app2.Append(9999999, nil, 1, 1)
|
||||
require.Equal(t, ErrInvalidSample, errors.Cause(err))
|
||||
|
||||
require.NoError(t, app2.Commit())
|
||||
|
||||
|
@ -340,11 +343,11 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
app1 := db.Appender(ctx)
|
||||
|
||||
ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
|
||||
ref1, err := app1.Append(0, labels.FromStrings("a", "b"), 123, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Construct labels manually so there is an empty label.
|
||||
ref2, err := app1.Add(labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0)
|
||||
ref2, err := app1.Append(0, labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should be the same series.
|
||||
|
@ -396,7 +399,7 @@ Outer:
|
|||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -452,12 +455,12 @@ func TestAmendDatapointCausesError(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
}
|
||||
|
@ -470,12 +473,12 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -487,12 +490,12 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
|
||||
require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err)
|
||||
}
|
||||
|
||||
|
@ -504,7 +507,7 @@ func TestEmptyLabelsetCausesError(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{}, 0, 0)
|
||||
_, err := app.Append(0, labels.Labels{}, 0, 0)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "empty labelset: invalid sample", err.Error())
|
||||
}
|
||||
|
@ -518,9 +521,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
// Append AmendedValue.
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -536,9 +539,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
|
||||
// Append Out of Order Value.
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -560,7 +563,7 @@ func TestDB_Snapshot(t *testing.T) {
|
|||
app := db.Appender(ctx)
|
||||
mint := int64(1414141414000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -610,7 +613,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
|
|||
app := db.Appender(ctx)
|
||||
mint := int64(1414141414000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -665,7 +668,7 @@ func TestDB_SnapshotWithDelete(t *testing.T) {
|
|||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -818,7 +821,7 @@ func TestDB_e2e(t *testing.T) {
|
|||
|
||||
series = append(series, sample{ts, v})
|
||||
|
||||
_, err := app.Add(lset, ts, v)
|
||||
_, err := app.Append(0, lset, ts, v)
|
||||
require.NoError(t, err)
|
||||
|
||||
ts += rand.Int63n(timeInterval) + 1
|
||||
|
@ -912,7 +915,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(lbls, 0, 1)
|
||||
_, err := app.Append(0, lbls, 0, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -986,10 +989,10 @@ func TestWALSegmentSizeOptions(t *testing.T) {
|
|||
|
||||
for i := int64(0); i < 155; i++ {
|
||||
app := db.Appender(context.Background())
|
||||
ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64())
|
||||
ref, err := app.Append(0, labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
for j := int64(1); j <= 78; j++ {
|
||||
err := app.AddFast(ref, i+j, rand.Float64())
|
||||
_, err := app.Append(ref, nil, i+j, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1013,7 +1016,7 @@ func TestTombstoneClean(t *testing.T) {
|
|||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1356,7 +1359,7 @@ func TestSizeRetention(t *testing.T) {
|
|||
it := s.Iterator()
|
||||
for it.Next() {
|
||||
tim, v := it.At()
|
||||
_, err := headApp.Add(s.Labels(), tim, v)
|
||||
_, err := headApp.Append(0, s.Labels(), tim, v)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
|
@ -1472,7 +1475,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for _, lbls := range labelpairs {
|
||||
_, err := app.Add(lbls, 0, 1)
|
||||
_, err := app.Append(0, lbls, 0, 1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1657,9 +1660,9 @@ func TestChunkAtBlockBoundary(t *testing.T) {
|
|||
label := labels.FromStrings("foo", "bar")
|
||||
|
||||
for i := int64(0); i < 3; i++ {
|
||||
_, err := app.Add(label, i*blockRange, 0)
|
||||
_, err := app.Append(0, label, i*blockRange, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(label, i*blockRange+1000, 0)
|
||||
_, err = app.Append(0, label, i*blockRange+1000, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -1714,9 +1717,9 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
|
|||
label := labels.FromStrings("foo", "bar")
|
||||
|
||||
for i := int64(0); i < 5; i++ {
|
||||
_, err := app.Add(label, i*blockRange, 0)
|
||||
_, err := app.Append(0, label, i*blockRange, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.FromStrings("blockID", strconv.FormatInt(i, 10)), i*blockRange, 0)
|
||||
_, err = app.Append(0, labels.FromStrings("blockID", strconv.FormatInt(i, 10)), i*blockRange, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -1763,7 +1766,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
|||
// First added sample initializes the writable range.
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 1000, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, int64(1000), db.head.MinTime())
|
||||
|
@ -1880,11 +1883,11 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
|
||||
t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
|
||||
app := db.Appender(ctx)
|
||||
_, err := app.Add(defaultLabel, 1, 0)
|
||||
_, err := app.Append(0, defaultLabel, 1, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, 2, 0)
|
||||
_, err = app.Append(0, defaultLabel, 2, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||
_, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
||||
|
@ -1897,16 +1900,16 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
require.Equal(t, 0, len(actBlocks))
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(defaultLabel, 1, 0)
|
||||
_, err = app.Append(0, defaultLabel, 1, 0)
|
||||
require.Equal(t, storage.ErrOutOfBounds, err, "the head should be truncated so no samples in the past should be allowed")
|
||||
|
||||
// Adding new blocks.
|
||||
currentTime := db.Head().MaxTime()
|
||||
_, err = app.Add(defaultLabel, currentTime, 0)
|
||||
_, err = app.Append(0, defaultLabel, currentTime, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
||||
_, err = app.Append(0, defaultLabel, currentTime+1, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||
_, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -1923,11 +1926,11 @@ func TestNoEmptyBlocks(t *testing.T) {
|
|||
oldBlocks := db.Blocks()
|
||||
app := db.Appender(ctx)
|
||||
currentTime := db.Head().MaxTime()
|
||||
_, err := app.Add(defaultLabel, currentTime, 0)
|
||||
_, err := app.Append(0, defaultLabel, currentTime, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, currentTime+1, 0)
|
||||
_, err = app.Append(0, defaultLabel, currentTime+1, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||
_, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
|
||||
|
@ -2009,7 +2012,7 @@ func TestDB_LabelNames(t *testing.T) {
|
|||
for i := mint; i <= maxt; i++ {
|
||||
for _, tuple := range sampleLabels {
|
||||
label := labels.FromStrings(tuple[0], tuple[1])
|
||||
_, err := app.Add(label, i*blockRange, 0)
|
||||
_, err := app.Append(0, label, i*blockRange, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -2076,7 +2079,7 @@ func TestCorrectNumTombstones(t *testing.T) {
|
|||
app := db.Appender(ctx)
|
||||
for i := int64(0); i < 3; i++ {
|
||||
for j := int64(0); j < 15; j++ {
|
||||
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
|
||||
_, err := app.Append(0, defaultLabel, i*blockRange+j, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -2129,16 +2132,16 @@ func TestBlockRanges(t *testing.T) {
|
|||
}()
|
||||
app := db.Appender(ctx)
|
||||
lbl := labels.Labels{{Name: "a", Value: "b"}}
|
||||
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
|
||||
_, err = app.Append(0, lbl, firstBlockMaxT-1, rand.Float64())
|
||||
if err == nil {
|
||||
t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
|
||||
}
|
||||
_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
|
||||
_, err = app.Append(0, lbl, firstBlockMaxT+1, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
|
||||
_, err = app.Append(0, lbl, firstBlockMaxT+2, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
secondBlockMaxt := firstBlockMaxT + rangeToTriggerCompaction
|
||||
_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction
|
||||
_, err = app.Append(0, lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2158,13 +2161,13 @@ func TestBlockRanges(t *testing.T) {
|
|||
// and compaction doesn't create an overlapping block.
|
||||
app = db.Appender(ctx)
|
||||
db.DisableCompactions()
|
||||
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
|
||||
_, err = app.Append(0, lbl, secondBlockMaxt+1, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
|
||||
_, err = app.Append(0, lbl, secondBlockMaxt+2, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
|
||||
_, err = app.Append(0, lbl, secondBlockMaxt+3, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
|
||||
_, err = app.Append(0, lbl, secondBlockMaxt+4, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.NoError(t, db.Close())
|
||||
|
@ -2180,7 +2183,7 @@ func TestBlockRanges(t *testing.T) {
|
|||
require.Equal(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction
|
||||
_, err = app.Append(0, lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
for x := 0; x < 100; x++ {
|
||||
|
@ -2247,7 +2250,7 @@ func TestDBReadOnly(t *testing.T) {
|
|||
dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir())
|
||||
require.NoError(t, err)
|
||||
app := dbWritable.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -2348,7 +2351,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
app := db.Appender(ctx)
|
||||
maxt = 1000
|
||||
for i := 0; i < maxt; i++ {
|
||||
_, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
|
||||
_, err := app.Append(0, labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2418,7 +2421,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
|
|||
app := db.Appender(ctx)
|
||||
|
||||
for j := 0; j < 100; j++ {
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
err = app.Commit()
|
||||
|
@ -2483,7 +2486,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000)
|
||||
|
@ -2808,7 +2811,7 @@ func TestCompactHead(t *testing.T) {
|
|||
maxt := 100
|
||||
for i := 0; i < maxt; i++ {
|
||||
val := rand.Float64()
|
||||
_, err := app.Add(labels.FromStrings("a", "b"), int64(i), val)
|
||||
_, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), val)
|
||||
require.NoError(t, err)
|
||||
expSamples = append(expSamples, sample{int64(i), val})
|
||||
}
|
||||
|
@ -3008,9 +3011,9 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
// Append samples spanning 59 block ranges.
|
||||
app := db.Appender(context.Background())
|
||||
for i := int64(0); i < 60; i++ {
|
||||
_, err := app.Add(lbls, blockRange*i, rand.Float64())
|
||||
_, err := app.Append(0, lbls, blockRange*i, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(lbls, (blockRange*i)+blockRange/2, rand.Float64())
|
||||
_, err = app.Append(0, lbls, (blockRange*i)+blockRange/2, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
// Rotate the WAL file so that there is >3 files for checkpoint to happen.
|
||||
require.NoError(t, db.head.wal.NextSegment())
|
||||
|
@ -3067,7 +3070,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
|
|||
|
||||
// Adding sample way into the future.
|
||||
app = db.Appender(context.Background())
|
||||
_, err = app.Add(lbls, blockRange*120, rand.Float64())
|
||||
_, err = app.Append(0, lbls, blockRange*120, rand.Float64())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
|
78
tsdb/head.go
78
tsdb/head.go
|
@ -1052,21 +1052,14 @@ type initAppender struct {
|
|||
head *Head
|
||||
}
|
||||
|
||||
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if a.app != nil {
|
||||
return a.app.Add(lset, t, v)
|
||||
return a.app.Append(ref, lset, t, v)
|
||||
}
|
||||
|
||||
a.head.initTime(t)
|
||||
a.app = a.head.appender()
|
||||
|
||||
return a.app.Add(lset, t, v)
|
||||
}
|
||||
|
||||
func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if a.app == nil {
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
return a.app.AddFast(ref, t, v)
|
||||
return a.app.Append(ref, lset, t, v)
|
||||
}
|
||||
|
||||
func (a *initAppender) Commit() error {
|
||||
|
@ -1178,54 +1171,45 @@ type headAppender struct {
|
|||
closed bool
|
||||
}
|
||||
|
||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t < a.minValidTime {
|
||||
a.head.metrics.outOfBoundSamples.Inc()
|
||||
return 0, storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
// Ensure no empty labels have gotten through.
|
||||
lset = lset.WithoutEmpty()
|
||||
|
||||
if len(lset) == 0 {
|
||||
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
|
||||
}
|
||||
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
|
||||
}
|
||||
|
||||
s, created, err := a.head.getOrCreate(lset.Hash(), lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if created {
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
})
|
||||
}
|
||||
return s.ref, a.AddFast(s.ref, t, v)
|
||||
}
|
||||
|
||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
if t < a.minValidTime {
|
||||
a.head.metrics.outOfBoundSamples.Inc()
|
||||
return storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
s := a.head.series.getByID(ref)
|
||||
if s == nil {
|
||||
return errors.Wrap(storage.ErrNotFound, "unknown series")
|
||||
// Ensure no empty labels have gotten through.
|
||||
lset = lset.WithoutEmpty()
|
||||
if len(lset) == 0 {
|
||||
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
|
||||
}
|
||||
|
||||
if l, dup := lset.HasDuplicateLabelNames(); dup {
|
||||
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
|
||||
}
|
||||
|
||||
var created bool
|
||||
var err error
|
||||
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if created {
|
||||
a.series = append(a.series, record.RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if err := s.appendable(t, v); err != nil {
|
||||
s.Unlock()
|
||||
if err == storage.ErrOutOfOrderSample {
|
||||
a.head.metrics.outOfOrderSamples.Inc()
|
||||
}
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
s.pendingCommit = true
|
||||
s.Unlock()
|
||||
|
@ -1238,12 +1222,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
}
|
||||
|
||||
a.samples = append(a.samples, record.RefSample{
|
||||
Ref: ref,
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
V: v,
|
||||
})
|
||||
a.sampleSeries = append(a.sampleSeries, s)
|
||||
return nil
|
||||
return s.ref, nil
|
||||
}
|
||||
|
||||
func (a *headAppender) log() error {
|
||||
|
|
|
@ -275,14 +275,14 @@ func TestHead_WALMultiRef(t *testing.T) {
|
|||
require.NoError(t, head.Init(0))
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
|
||||
ref1, err := app.Append(0, labels.FromStrings("foo", "bar"), 100, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
|
||||
|
||||
// Add another sample outside chunk range to mmap a chunk.
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1500, 2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
|
||||
|
@ -290,14 +290,14 @@ func TestHead_WALMultiRef(t *testing.T) {
|
|||
require.NoError(t, head.Truncate(1600))
|
||||
|
||||
app = head.Appender(context.Background())
|
||||
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3)
|
||||
ref2, err := app.Append(0, labels.FromStrings("foo", "bar"), 1700, 3)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
|
||||
|
||||
// Add another sample outside chunk range to mmap a chunk.
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 2000, 4)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
|
||||
|
@ -569,7 +569,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
|||
|
||||
app := head.Appender(context.Background())
|
||||
for _, smpl := range smplsAll {
|
||||
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
|
||||
_, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
@ -583,7 +583,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
|||
// Add more samples.
|
||||
app = head.Appender(context.Background())
|
||||
for _, smpl := range c.addSamples {
|
||||
_, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
|
||||
_, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
@ -655,7 +655,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -676,7 +676,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
|
||||
// Add again and test for presence.
|
||||
app = hb.Appender(context.Background())
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
q, err = NewBlockQuerier(hb, 0, 100000)
|
||||
|
@ -702,7 +702,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
|
|||
|
||||
for i := 0; i < numSamples; i++ {
|
||||
app := hb.Appender(context.Background())
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -800,7 +800,7 @@ func TestDelete_e2e(t *testing.T) {
|
|||
ts := rand.Int63n(300)
|
||||
for i := 0; i < numDatapoints; i++ {
|
||||
v := rand.Float64()
|
||||
_, err := app.Add(ls, ts, v)
|
||||
_, err := app.Append(0, ls, ts, v)
|
||||
require.NoError(t, err)
|
||||
series = append(series, sample{ts, v})
|
||||
ts += rand.Int63n(timeInterval) + 1
|
||||
|
@ -1145,7 +1145,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
|||
|
||||
app := h.appender()
|
||||
lset := labels.FromStrings("a", "1")
|
||||
_, err := app.Add(lset, 2100, 1)
|
||||
_, err := app.Append(0, lset, 2100, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, h.Truncate(2000))
|
||||
|
@ -1175,7 +1175,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
|||
|
||||
app := h.appender()
|
||||
lset := labels.FromStrings("a", "1")
|
||||
_, err := app.Add(lset, 2100, 1)
|
||||
_, err := app.Append(0, lset, 2100, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, h.Truncate(2000))
|
||||
|
@ -1205,7 +1205,7 @@ func TestHead_LogRollback(t *testing.T) {
|
|||
}()
|
||||
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Add(labels.FromStrings("a", "b"), 1, 2)
|
||||
_, err := app.Append(0, labels.FromStrings("a", "b"), 1, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, app.Rollback())
|
||||
|
@ -1397,7 +1397,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
|
|||
}()
|
||||
add := func(ts int64) {
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -1428,7 +1428,7 @@ func TestAddDuplicateLabelName(t *testing.T) {
|
|||
|
||||
add := func(labels labels.Labels, labelName string) {
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Add(labels, 0, 0)
|
||||
_, err := app.Append(0, labels, 0, 0)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
|
||||
}
|
||||
|
@ -1489,7 +1489,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
app = a
|
||||
}
|
||||
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -1517,7 +1517,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
// Cleanup appendIDs below 500.
|
||||
app := hb.appender()
|
||||
app.cleanupAppendIDsBelow = 500
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
i++
|
||||
|
@ -1536,7 +1536,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
// the only thing with appendIDs.
|
||||
app = hb.appender()
|
||||
app.cleanupAppendIDsBelow = 1000
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, 999, lastValue(hb, 998))
|
||||
|
@ -1550,7 +1550,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
// Cleanup appendIDs below 1001, but with a rollback.
|
||||
app = hb.appender()
|
||||
app.cleanupAppendIDsBelow = 1001
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Equal(t, 1000, lastValue(hb, 999))
|
||||
|
@ -1586,7 +1586,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
// Cleanup appendIDs below 1000, which means the sample buffer is
|
||||
// the only thing with appendIDs.
|
||||
app = hb.appender()
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
i++
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1599,7 +1599,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
|
||||
// Cleanup appendIDs below 1002, but with a rollback.
|
||||
app = hb.appender()
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Equal(t, 1001, lastValue(hb, 999))
|
||||
|
@ -1617,21 +1617,21 @@ func TestIsolationRollback(t *testing.T) {
|
|||
}()
|
||||
|
||||
app := hb.Appender(context.Background())
|
||||
_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(1), hb.iso.lowWatermark())
|
||||
|
||||
app = hb.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
|
||||
require.Error(t, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Equal(t, uint64(2), hb.iso.lowWatermark())
|
||||
|
||||
app = hb.Appender(context.Background())
|
||||
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 3, 3)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Equal(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.")
|
||||
|
@ -1644,18 +1644,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
|||
}()
|
||||
|
||||
app1 := hb.Appender(context.Background())
|
||||
_, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
|
||||
_, err := app1.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app1.Commit())
|
||||
require.Equal(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.")
|
||||
|
||||
app1 = hb.Appender(context.Background())
|
||||
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
|
||||
_, err = app1.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.")
|
||||
|
||||
app2 := hb.Appender(context.Background())
|
||||
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
|
||||
_, err = app2.Append(0, labels.FromStrings("foo", "baz"), 1, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app2.Commit())
|
||||
require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.")
|
||||
|
@ -1701,7 +1701,7 @@ func TestIsolationWithoutAdd(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
app = hb.Appender(context.Background())
|
||||
_, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1)
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "baz"), 1, 1)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -1725,7 +1725,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1733,22 +1733,22 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
// Test out of order metric.
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 2, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 3, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), 4, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Compact Head to test out of bound metric.
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -1757,11 +1757,11 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
require.Greater(t, db.head.minValidTime.Load(), int64(0))
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
|
||||
require.Equal(t, storage.ErrOutOfBounds, err)
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
|
||||
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99)
|
||||
require.Equal(t, storage.ErrOutOfBounds, err)
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1769,22 +1769,22 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
// Some more valid samples for out of order.
|
||||
app = db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Test out of order metric.
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
|
||||
_, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1798,10 +1798,10 @@ func testHeadSeriesChunkRace(t *testing.T) {
|
|||
require.NoError(t, h.Init(0))
|
||||
app := h.Appender(context.Background())
|
||||
|
||||
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
|
||||
s2, err := app.Append(0, labels.FromStrings("foo2", "bar"), 5, 0)
|
||||
require.NoError(t, err)
|
||||
for ts := int64(6); ts < 11; ts++ {
|
||||
err = app.AddFast(s2, ts, 0)
|
||||
_, err = app.Append(s2, nil, ts, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1847,7 +1847,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
|||
|
||||
app := head.Appender(context.Background())
|
||||
for i, name := range expectedLabelNames {
|
||||
_, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0)
|
||||
_, err := app.Append(0, labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -1892,7 +1892,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
|||
|
||||
app := head.Appender(context.Background())
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := app.Add(labels.Labels{
|
||||
_, err := app.Append(0, labels.Labels{
|
||||
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
|
||||
{Name: "tens", Value: fmt.Sprintf("value%d", i/10)},
|
||||
}, 100, 0)
|
||||
|
@ -1952,28 +1952,28 @@ func TestErrReuseAppender(t *testing.T) {
|
|||
}()
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Error(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Error(t, app.Rollback())
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
require.Error(t, app.Rollback())
|
||||
require.Error(t, app.Commit())
|
||||
|
||||
app = head.Appender(context.Background())
|
||||
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
require.Error(t, app.Commit())
|
||||
|
@ -1985,11 +1985,11 @@ func TestHeadMintAfterTruncation(t *testing.T) {
|
|||
head, _ := newTestHead(t, chunkRange, false)
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
|
||||
_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 4000, 200)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 4000, 200)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 8000, 300)
|
||||
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 8000, 300)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -2023,7 +2023,7 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
|||
|
||||
metricCount := 1000000
|
||||
for i := 0; i < metricCount; i++ {
|
||||
_, err := app.Add(labels.Labels{
|
||||
_, err := app.Append(0, labels.Labels{
|
||||
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
|
||||
{Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))},
|
||||
{Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1"
|
||||
|
|
|
@ -48,7 +48,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
|||
|
||||
app := h.Appender(context.Background())
|
||||
addSeries := func(l labels.Labels) {
|
||||
app.Add(l, 0, 0)
|
||||
app.Append(0, l, 0, 0)
|
||||
}
|
||||
|
||||
for n := 0; n < 10; n++ {
|
||||
|
@ -158,7 +158,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
app := h.Appender(context.Background())
|
||||
numSeries := 1000000
|
||||
for i := 0; i < numSeries; i++ {
|
||||
app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
||||
app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
||||
}
|
||||
require.NoError(b, app.Commit())
|
||||
|
||||
|
|
|
@ -417,7 +417,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
|||
for _, s := range testData {
|
||||
for _, chk := range s.chunks {
|
||||
for _, sample := range chk {
|
||||
_, err = app.Add(labels.FromMap(s.lset), sample.t, sample.v)
|
||||
_, err = app.Append(0, labels.FromMap(s.lset), sample.t, sample.v)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -1579,11 +1579,11 @@ func TestPostingsForMatchers(t *testing.T) {
|
|||
}()
|
||||
|
||||
app := h.Appender(context.Background())
|
||||
app.Add(labels.FromStrings("n", "1"), 0, 0)
|
||||
app.Add(labels.FromStrings("n", "1", "i", "a"), 0, 0)
|
||||
app.Add(labels.FromStrings("n", "1", "i", "b"), 0, 0)
|
||||
app.Add(labels.FromStrings("n", "2"), 0, 0)
|
||||
app.Add(labels.FromStrings("n", "2.5"), 0, 0)
|
||||
app.Append(0, labels.FromStrings("n", "1"), 0, 0)
|
||||
app.Append(0, labels.FromStrings("n", "1", "i", "a"), 0, 0)
|
||||
app.Append(0, labels.FromStrings("n", "1", "i", "b"), 0, 0)
|
||||
app.Append(0, labels.FromStrings("n", "2"), 0, 0)
|
||||
app.Append(0, labels.FromStrings("n", "2.5"), 0, 0)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
cases := []struct {
|
||||
|
|
|
@ -50,14 +50,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
|
|||
for _, s := range series {
|
||||
ref := uint64(0)
|
||||
it := s.Iterator()
|
||||
lset := s.Labels()
|
||||
for it.Next() {
|
||||
t, v := it.At()
|
||||
if ref != 0 {
|
||||
if err := app.AddFast(ref, t, v); err == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
ref, err = app.Add(s.Labels(), t, v)
|
||||
ref, err = app.Append(ref, lset, t, v)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue