mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Switch append refs to string
This commit is contained in:
parent
c8438cfc81
commit
285bc07030
|
@ -197,7 +197,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
|
||||||
type sample struct {
|
type sample struct {
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
value int64
|
value int64
|
||||||
ref *uint64
|
ref *string
|
||||||
}
|
}
|
||||||
|
|
||||||
scrape := make([]*sample, 0, len(metrics))
|
scrape := make([]*sample, 0, len(metrics))
|
||||||
|
|
30
db.go
30
db.go
|
@ -86,11 +86,11 @@ type Appender interface {
|
||||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||||
// to AddFast() at any point. Adding the sample via Add() returns a new
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
||||||
// reference number.
|
// reference number.
|
||||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
Add(l labels.Labels, t int64, v float64) (string, error)
|
||||||
|
|
||||||
// Add adds a sample pair for the referenced series. It is generally faster
|
// Add adds a sample pair for the referenced series. It is generally faster
|
||||||
// than adding a sample by providing its full label set.
|
// than adding a sample by providing its full label set.
|
||||||
AddFast(ref uint64, t int64, v float64) error
|
AddFast(ref string, t int64, v float64) error
|
||||||
|
|
||||||
// Commit submits the collected samples and purges the batch.
|
// Commit submits the collected samples and purges the batch.
|
||||||
Commit() error
|
Commit() error
|
||||||
|
@ -517,34 +517,33 @@ type metaAppender struct {
|
||||||
app Appender
|
app Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
h, err := a.appenderFor(t)
|
h, err := a.appenderFor(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return "", err
|
||||||
}
|
}
|
||||||
ref, err := h.app.Add(lset, t, v)
|
ref, err := h.app.Add(lset, t, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return "", err
|
||||||
}
|
}
|
||||||
a.samples++
|
a.samples++
|
||||||
// Store last byte of sequence number in 3rd byte of reference.
|
|
||||||
return ref | (uint64(h.meta.Sequence&0xff) << 40), nil
|
return string(append(h.meta.ULID[:], ref...)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
// Load the head last byte of the head sequence from the 3rd byte of the
|
// Load the head last byte of the head sequence from the 3rd byte of the
|
||||||
// reference number.
|
// reference number.
|
||||||
gen := (ref << 16) >> 56
|
// gen := (ref << 16) >> 56
|
||||||
|
|
||||||
h, err := a.appenderFor(t)
|
h, err := a.appenderFor(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If the last byte of the sequence does not add up, the reference is not valid.
|
if yoloString(h.meta.ULID[:]) != ref[:16] {
|
||||||
if uint64(h.meta.Sequence&0xff) != gen {
|
return errors.Wrap(ErrNotFound, "unexpected ULID")
|
||||||
return ErrNotFound
|
|
||||||
}
|
}
|
||||||
if err := h.app.AddFast(ref, t, v); err != nil {
|
if err := h.app.AddFast(ref[16:], t, v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -870,9 +869,8 @@ func (es MultiError) Err() error {
|
||||||
return es
|
return es
|
||||||
}
|
}
|
||||||
|
|
||||||
func yoloString(b []byte) string {
|
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) }
|
||||||
return *((*string)(unsafe.Pointer(&b)))
|
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) }
|
||||||
}
|
|
||||||
|
|
||||||
func closeAll(cs ...io.Closer) error {
|
func closeAll(cs ...io.Closer) error {
|
||||||
var merr MultiError
|
var merr MultiError
|
||||||
|
|
30
db_test.go
30
db_test.go
|
@ -18,6 +18,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -113,19 +114,30 @@ func TestDBAppenderAddRef(t *testing.T) {
|
||||||
ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0)
|
ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Head sequence number should be in 3rd MSB and be greater than 0.
|
// When a series is first created, refs don't work within that transaction.
|
||||||
gen := (ref << 16) >> 56
|
err = app.AddFast(ref, 1, 1)
|
||||||
require.True(t, gen > 1)
|
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||||
|
|
||||||
|
err = app.Commit()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
app = db.Appender()
|
||||||
|
|
||||||
|
ref, err = app.Add(labels.FromStrings("a", "b"), 1, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Ref must be prefixed with block ULID of the block we wrote to.
|
||||||
|
id := db.blocks[len(db.blocks)-1].Meta().ULID
|
||||||
|
require.Equal(t, string(id[:]), ref[:16])
|
||||||
|
|
||||||
// Reference must be valid to add another sample.
|
// Reference must be valid to add another sample.
|
||||||
err = app.AddFast(ref, 1, 1)
|
err = app.AddFast(ref, 2, 2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// AddFast for the same timestamp must fail if the generation in the reference
|
// AddFast for the same timestamp must fail if the generation in the reference
|
||||||
// doesn't add up.
|
// doesn't add up.
|
||||||
refBad := ref | ((gen + 1) << 4)
|
refb := []byte(ref)
|
||||||
err = app.AddFast(refBad, 1, 1)
|
refb[15] ^= refb[15]
|
||||||
require.Error(t, err)
|
err = app.AddFast(string(refb), 1, 1)
|
||||||
|
require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
|
||||||
require.Equal(t, 2, app.(*dbAppender).samples)
|
|
||||||
}
|
}
|
||||||
|
|
107
head.go
107
head.go
|
@ -23,6 +23,8 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"encoding/binary"
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -176,6 +178,7 @@ func (h *HeadBlock) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Meta returns a BlockMeta for the head block.
|
||||||
func (h *HeadBlock) Meta() BlockMeta {
|
func (h *HeadBlock) Meta() BlockMeta {
|
||||||
m := BlockMeta{
|
m := BlockMeta{
|
||||||
ULID: h.meta.ULID,
|
ULID: h.meta.ULID,
|
||||||
|
@ -192,11 +195,16 @@ func (h *HeadBlock) Meta() BlockMeta {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dir returns the directory of the block.
|
||||||
func (h *HeadBlock) Dir() string { return h.dir }
|
func (h *HeadBlock) Dir() string { return h.dir }
|
||||||
func (h *HeadBlock) Persisted() bool { return false }
|
|
||||||
|
// Index returns an IndexReader against the block.
|
||||||
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
||||||
|
|
||||||
|
// Chunks returns a ChunkReader against the block.
|
||||||
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
|
||||||
|
// Querier returns a new Querier against the block for the range [mint, maxt].
|
||||||
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
@ -236,6 +244,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Appender returns a new Appender against the head block.
|
||||||
func (h *HeadBlock) Appender() Appender {
|
func (h *HeadBlock) Appender() Appender {
|
||||||
atomic.AddUint64(&h.activeWriters, 1)
|
atomic.AddUint64(&h.activeWriters, 1)
|
||||||
|
|
||||||
|
@ -247,6 +256,7 @@ func (h *HeadBlock) Appender() Appender {
|
||||||
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Busy returns true if the block has open write transactions.
|
||||||
func (h *HeadBlock) Busy() bool {
|
func (h *HeadBlock) Busy() bool {
|
||||||
return atomic.LoadUint64(&h.activeWriters) > 0
|
return atomic.LoadUint64(&h.activeWriters) > 0
|
||||||
}
|
}
|
||||||
|
@ -268,74 +278,86 @@ func putHeadAppendBuffer(b []RefSample) {
|
||||||
type headAppender struct {
|
type headAppender struct {
|
||||||
*HeadBlock
|
*HeadBlock
|
||||||
|
|
||||||
newSeries map[uint64]hashedLabels
|
newSeries []*hashedLabels
|
||||||
newHashes map[uint64]uint64
|
|
||||||
refmap map[uint64]uint64
|
|
||||||
newLabels []labels.Labels
|
newLabels []labels.Labels
|
||||||
|
newHashes map[uint64]uint64
|
||||||
|
|
||||||
samples []RefSample
|
samples []RefSample
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashedLabels struct {
|
type hashedLabels struct {
|
||||||
|
ref uint64
|
||||||
hash uint64
|
hash uint64
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
if !a.inBounds(t) {
|
if !a.inBounds(t) {
|
||||||
return 0, ErrOutOfBounds
|
return "", ErrOutOfBounds
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := lset.Hash()
|
hash := lset.Hash()
|
||||||
|
refb := make([]byte, 8)
|
||||||
|
|
||||||
|
// Series exists already in the block.
|
||||||
if ms := a.get(hash, lset); ms != nil {
|
if ms := a.get(hash, lset); ms != nil {
|
||||||
return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v)
|
binary.BigEndian.PutUint64(refb, uint64(ms.ref))
|
||||||
|
return string(refb), a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
// Series was added in this transaction previously.
|
||||||
if ref, ok := a.newHashes[hash]; ok {
|
if ref, ok := a.newHashes[hash]; ok {
|
||||||
return uint64(ref), a.AddFast(uint64(ref), t, v)
|
binary.BigEndian.PutUint64(refb, ref)
|
||||||
|
// XXX(fabxc): there's no fast path for multiple samples for the same new series
|
||||||
|
// in the same transaction. We always return the invalid empty ref. It's has not
|
||||||
|
// been a relevant use case so far and is not worth the trouble.
|
||||||
|
return nullRef, a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We only know the actual reference after committing. We generate an
|
// The series is completely new.
|
||||||
// intermediate reference only valid for this batch.
|
|
||||||
// It is indicated by the the LSB of the 4th byte being set to 1.
|
|
||||||
// We use a random ID to avoid collisions when new series are created
|
|
||||||
// in two subsequent batches.
|
|
||||||
// TODO(fabxc): Provide method for client to determine whether a ref
|
|
||||||
// is valid beyond the current transaction.
|
|
||||||
ref := uint64(rand.Int31()) | (1 << 32)
|
|
||||||
|
|
||||||
if a.newSeries == nil {
|
if a.newSeries == nil {
|
||||||
a.newSeries = map[uint64]hashedLabels{}
|
|
||||||
a.newHashes = map[uint64]uint64{}
|
a.newHashes = map[uint64]uint64{}
|
||||||
a.refmap = map[uint64]uint64{}
|
|
||||||
}
|
}
|
||||||
a.newSeries[ref] = hashedLabels{hash: hash, labels: lset}
|
// First sample for new series.
|
||||||
a.newHashes[hash] = ref
|
ref := uint64(len(a.newSeries))
|
||||||
|
|
||||||
return ref, a.AddFast(ref, t, v)
|
a.newSeries = append(a.newSeries, &hashedLabels{
|
||||||
|
ref: ref,
|
||||||
|
hash: hash,
|
||||||
|
labels: lset,
|
||||||
|
})
|
||||||
|
// First bit indicates its a series created in this transaction.
|
||||||
|
ref |= (1 << 63)
|
||||||
|
|
||||||
|
a.newHashes[hash] = ref
|
||||||
|
binary.BigEndian.PutUint64(refb, ref)
|
||||||
|
|
||||||
|
return nullRef, a.AddFast(string(refb), t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0})
|
||||||
// We only own the last 5 bytes of the reference. Anything before is
|
|
||||||
// used by higher-order appenders. We erase it to avoid issues.
|
|
||||||
ref = (ref << 24) >> 24
|
|
||||||
|
|
||||||
|
func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
|
var (
|
||||||
|
refn = binary.BigEndian.Uint64(yoloBytes(ref))
|
||||||
|
id = (refn << 1) >> 1
|
||||||
|
inTx = refn&(1<<63) != 0
|
||||||
|
)
|
||||||
// Distinguish between existing series and series created in
|
// Distinguish between existing series and series created in
|
||||||
// this transaction.
|
// this transaction.
|
||||||
if ref&(1<<32) != 0 {
|
if inTx {
|
||||||
if _, ok := a.newSeries[ref]; !ok {
|
if id > uint64(len(a.newSeries)-1) {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "transaction series ID too high")
|
||||||
}
|
}
|
||||||
// TODO(fabxc): we also have to validate here that the
|
// TODO(fabxc): we also have to validate here that the
|
||||||
// sample sequence is valid.
|
// sample sequence is valid.
|
||||||
// We also have to revalidate it as we switch locks an create
|
// We also have to revalidate it as we switch locks and create
|
||||||
// the new series.
|
// the new series.
|
||||||
} else if ref > uint64(len(a.series)) {
|
} else if id > uint64(len(a.series)) {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "transaction series ID too high")
|
||||||
} else {
|
} else {
|
||||||
ms := a.series[int(ref)]
|
ms := a.series[id]
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return ErrNotFound
|
return errors.Wrap(ErrNotFound, "nil series")
|
||||||
}
|
}
|
||||||
// TODO(fabxc): memory series should be locked here already.
|
// TODO(fabxc): memory series should be locked here already.
|
||||||
// Only problem is release of locks in case of a rollback.
|
// Only problem is release of locks in case of a rollback.
|
||||||
|
@ -356,7 +378,7 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
a.samples = append(a.samples, RefSample{
|
a.samples = append(a.samples, RefSample{
|
||||||
Ref: ref,
|
Ref: refn,
|
||||||
T: t,
|
T: t,
|
||||||
V: v,
|
V: v,
|
||||||
})
|
})
|
||||||
|
@ -375,18 +397,18 @@ func (a *headAppender) createSeries() {
|
||||||
|
|
||||||
base1 := len(a.series)
|
base1 := len(a.series)
|
||||||
|
|
||||||
for ref, l := range a.newSeries {
|
for _, l := range a.newSeries {
|
||||||
// We switched locks and have to re-validate that the series were not
|
// We switched locks and have to re-validate that the series were not
|
||||||
// created by another goroutine in the meantime.
|
// created by another goroutine in the meantime.
|
||||||
if base1 > base0 {
|
if base1 > base0 {
|
||||||
if ms := a.get(l.hash, l.labels); ms != nil {
|
if ms := a.get(l.hash, l.labels); ms != nil {
|
||||||
a.refmap[ref] = uint64(ms.ref)
|
l.ref = uint64(ms.ref)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Series is still new.
|
// Series is still new.
|
||||||
a.newLabels = append(a.newLabels, l.labels)
|
a.newLabels = append(a.newLabels, l.labels)
|
||||||
a.refmap[ref] = uint64(len(a.series))
|
l.ref = uint64(len(a.series))
|
||||||
|
|
||||||
a.create(l.hash, l.labels)
|
a.create(l.hash, l.labels)
|
||||||
}
|
}
|
||||||
|
@ -401,11 +423,11 @@ func (a *headAppender) Commit() error {
|
||||||
|
|
||||||
a.createSeries()
|
a.createSeries()
|
||||||
|
|
||||||
|
// We have to update the refs of samples for series we just created.
|
||||||
for i := range a.samples {
|
for i := range a.samples {
|
||||||
s := &a.samples[i]
|
s := &a.samples[i]
|
||||||
|
if s.Ref&(1<<63) != 0 {
|
||||||
if s.Ref&(1<<32) > 0 {
|
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
|
||||||
s.Ref = a.refmap[s.Ref]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -514,6 +536,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error
|
||||||
return nil, nil, ErrNotFound
|
return nil, nil, ErrNotFound
|
||||||
}
|
}
|
||||||
s := h.series[ref]
|
s := h.series[ref]
|
||||||
|
if s == nil {
|
||||||
|
return nil, nil, ErrNotFound
|
||||||
|
}
|
||||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
||||||
|
|
||||||
s.mtx.RLock()
|
s.mtx.RLock()
|
||||||
|
|
Loading…
Reference in a new issue