diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e47254febd..4c0b8cdaae 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -197,7 +197,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount type sample struct { labels labels.Labels value int64 - ref *uint64 + ref *string } scrape := make([]*sample, 0, len(metrics)) diff --git a/db.go b/db.go index c4e97abcc1..6031e7affd 100644 --- a/db.go +++ b/db.go @@ -86,11 +86,11 @@ type Appender interface { // Returned reference numbers are ephemeral and may be rejected in calls // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. - Add(l labels.Labels, t int64, v float64) (uint64, error) + Add(l labels.Labels, t int64, v float64) (string, error) // Add adds a sample pair for the referenced series. It is generally faster // than adding a sample by providing its full label set. - AddFast(ref uint64, t int64, v float64) error + AddFast(ref string, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -517,34 +517,33 @@ type metaAppender struct { app Appender } -func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { h, err := a.appenderFor(t) if err != nil { - return 0, err + return "", err } ref, err := h.app.Add(lset, t, v) if err != nil { - return 0, err + return "", err } a.samples++ - // Store last byte of sequence number in 3rd byte of reference. - return ref | (uint64(h.meta.Sequence&0xff) << 40), nil + + return string(append(h.meta.ULID[:], ref...)), nil } -func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { +func (a *dbAppender) AddFast(ref string, t int64, v float64) error { // Load the head last byte of the head sequence from the 3rd byte of the // reference number. - gen := (ref << 16) >> 56 + // gen := (ref << 16) >> 56 h, err := a.appenderFor(t) if err != nil { return err } - // If the last byte of the sequence does not add up, the reference is not valid. - if uint64(h.meta.Sequence&0xff) != gen { - return ErrNotFound + if yoloString(h.meta.ULID[:]) != ref[:16] { + return errors.Wrap(ErrNotFound, "unexpected ULID") } - if err := h.app.AddFast(ref, t, v); err != nil { + if err := h.app.AddFast(ref[16:], t, v); err != nil { return err } @@ -870,9 +869,8 @@ func (es MultiError) Err() error { return es } -func yoloString(b []byte) string { - return *((*string)(unsafe.Pointer(&b))) -} +func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } +func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) } func closeAll(cs ...io.Closer) error { var merr MultiError diff --git a/db_test.go b/db_test.go index f3b2dca2a1..2f6770fff6 100644 --- a/db_test.go +++ b/db_test.go @@ -18,6 +18,7 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -113,19 +114,30 @@ func TestDBAppenderAddRef(t *testing.T) { ref, err := app.Add(labels.FromStrings("a", "b"), 0, 0) require.NoError(t, err) - // Head sequence number should be in 3rd MSB and be greater than 0. - gen := (ref << 16) >> 56 - require.True(t, gen > 1) + // When a series is first created, refs don't work within that transaction. + err = app.AddFast(ref, 1, 1) + require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) + + err = app.Commit() + require.NoError(t, err) + + app = db.Appender() + + ref, err = app.Add(labels.FromStrings("a", "b"), 1, 1) + require.NoError(t, err) + + // Ref must be prefixed with block ULID of the block we wrote to. + id := db.blocks[len(db.blocks)-1].Meta().ULID + require.Equal(t, string(id[:]), ref[:16]) // Reference must be valid to add another sample. - err = app.AddFast(ref, 1, 1) + err = app.AddFast(ref, 2, 2) require.NoError(t, err) // AddFast for the same timestamp must fail if the generation in the reference // doesn't add up. - refBad := ref | ((gen + 1) << 4) - err = app.AddFast(refBad, 1, 1) - require.Error(t, err) - - require.Equal(t, 2, app.(*dbAppender).samples) + refb := []byte(ref) + refb[15] ^= refb[15] + err = app.AddFast(string(refb), 1, 1) + require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) } diff --git a/head.go b/head.go index b71bbafc09..4a195a795c 100644 --- a/head.go +++ b/head.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "encoding/binary" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -176,6 +178,7 @@ func (h *HeadBlock) Close() error { return nil } +// Meta returns a BlockMeta for the head block. func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, @@ -192,11 +195,16 @@ func (h *HeadBlock) Meta() BlockMeta { return m } -func (h *HeadBlock) Dir() string { return h.dir } -func (h *HeadBlock) Persisted() bool { return false } -func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +// Dir returns the directory of the block. +func (h *HeadBlock) Dir() string { return h.dir } + +// Index returns an IndexReader against the block. +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } + +// Chunks returns a ChunkReader against the block. func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } +// Querier returns a new Querier against the block for the range [mint, maxt]. func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -236,6 +244,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { } } +// Appender returns a new Appender against the head block. func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -247,6 +256,7 @@ func (h *HeadBlock) Appender() Appender { return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } +// Busy returns true if the block has open write transactions. func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } @@ -268,74 +278,86 @@ func putHeadAppendBuffer(b []RefSample) { type headAppender struct { *HeadBlock - newSeries map[uint64]hashedLabels - newHashes map[uint64]uint64 - refmap map[uint64]uint64 + newSeries []*hashedLabels newLabels []labels.Labels + newHashes map[uint64]uint64 samples []RefSample } type hashedLabels struct { + ref uint64 hash uint64 labels labels.Labels } -func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { if !a.inBounds(t) { - return 0, ErrOutOfBounds + return "", ErrOutOfBounds } hash := lset.Hash() + refb := make([]byte, 8) + // Series exists already in the block. if ms := a.get(hash, lset); ms != nil { - return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v) + binary.BigEndian.PutUint64(refb, uint64(ms.ref)) + return string(refb), a.AddFast(string(refb), t, v) } + // Series was added in this transaction previously. if ref, ok := a.newHashes[hash]; ok { - return uint64(ref), a.AddFast(uint64(ref), t, v) + binary.BigEndian.PutUint64(refb, ref) + // XXX(fabxc): there's no fast path for multiple samples for the same new series + // in the same transaction. We always return the invalid empty ref. It's has not + // been a relevant use case so far and is not worth the trouble. + return nullRef, a.AddFast(string(refb), t, v) } - // We only know the actual reference after committing. We generate an - // intermediate reference only valid for this batch. - // It is indicated by the the LSB of the 4th byte being set to 1. - // We use a random ID to avoid collisions when new series are created - // in two subsequent batches. - // TODO(fabxc): Provide method for client to determine whether a ref - // is valid beyond the current transaction. - ref := uint64(rand.Int31()) | (1 << 32) - + // The series is completely new. if a.newSeries == nil { - a.newSeries = map[uint64]hashedLabels{} a.newHashes = map[uint64]uint64{} - a.refmap = map[uint64]uint64{} } - a.newSeries[ref] = hashedLabels{hash: hash, labels: lset} - a.newHashes[hash] = ref + // First sample for new series. + ref := uint64(len(a.newSeries)) - return ref, a.AddFast(ref, t, v) + a.newSeries = append(a.newSeries, &hashedLabels{ + ref: ref, + hash: hash, + labels: lset, + }) + // First bit indicates its a series created in this transaction. + ref |= (1 << 63) + + a.newHashes[hash] = ref + binary.BigEndian.PutUint64(refb, ref) + + return nullRef, a.AddFast(string(refb), t, v) } -func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - // We only own the last 5 bytes of the reference. Anything before is - // used by higher-order appenders. We erase it to avoid issues. - ref = (ref << 24) >> 24 +var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0}) +func (a *headAppender) AddFast(ref string, t int64, v float64) error { + var ( + refn = binary.BigEndian.Uint64(yoloBytes(ref)) + id = (refn << 1) >> 1 + inTx = refn&(1<<63) != 0 + ) // Distinguish between existing series and series created in // this transaction. - if ref&(1<<32) != 0 { - if _, ok := a.newSeries[ref]; !ok { - return ErrNotFound + if inTx { + if id > uint64(len(a.newSeries)-1) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") } // TODO(fabxc): we also have to validate here that the // sample sequence is valid. - // We also have to revalidate it as we switch locks an create + // We also have to revalidate it as we switch locks and create // the new series. - } else if ref > uint64(len(a.series)) { - return ErrNotFound + } else if id > uint64(len(a.series)) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") } else { - ms := a.series[int(ref)] + ms := a.series[id] if ms == nil { - return ErrNotFound + return errors.Wrap(ErrNotFound, "nil series") } // TODO(fabxc): memory series should be locked here already. // Only problem is release of locks in case of a rollback. @@ -356,7 +378,7 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } a.samples = append(a.samples, RefSample{ - Ref: ref, + Ref: refn, T: t, V: v, }) @@ -375,18 +397,18 @@ func (a *headAppender) createSeries() { base1 := len(a.series) - for ref, l := range a.newSeries { + for _, l := range a.newSeries { // We switched locks and have to re-validate that the series were not // created by another goroutine in the meantime. if base1 > base0 { if ms := a.get(l.hash, l.labels); ms != nil { - a.refmap[ref] = uint64(ms.ref) + l.ref = uint64(ms.ref) continue } } // Series is still new. a.newLabels = append(a.newLabels, l.labels) - a.refmap[ref] = uint64(len(a.series)) + l.ref = uint64(len(a.series)) a.create(l.hash, l.labels) } @@ -401,11 +423,11 @@ func (a *headAppender) Commit() error { a.createSeries() + // We have to update the refs of samples for series we just created. for i := range a.samples { s := &a.samples[i] - - if s.Ref&(1<<32) > 0 { - s.Ref = a.refmap[s.Ref] + if s.Ref&(1<<63) != 0 { + s.Ref = a.newSeries[(s.Ref<<1)>>1].ref } } @@ -514,6 +536,9 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error return nil, nil, ErrNotFound } s := h.series[ref] + if s == nil { + return nil, nil, ErrNotFound + } metas := make([]*ChunkMeta, 0, len(s.chunks)) s.mtx.RLock()