diff --git a/db.go b/db.go index 2e225f3e3..514c1661a 100644 --- a/db.go +++ b/db.go @@ -352,7 +352,7 @@ func (a *dbAppender) SetSeries(lset labels.Labels) (uint64, error) { if err != nil { return 0, err } - return ref | (uint64(a.gen) << 32), nil + return ref | (uint64(a.gen) << 40), nil } func (a *dbAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { @@ -360,13 +360,13 @@ func (a *dbAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) if err != nil { return 0, err } - return ref | (uint64(a.gen) << 32), nil + return ref | (uint64(a.gen) << 40), nil } func (a *dbAppender) Add(ref uint64, t int64, v float64) error { // We store the head generation in the 4th byte and use it to reject // stale references. - gen := uint8((ref << 24) >> 56) + gen := uint8((ref << 16) >> 56) if gen != a.gen { return errNotFound @@ -647,11 +647,11 @@ func (a *partitionedAppender) SetSeries(lset labels.Labels) (uint64, error) { if err != nil { return 0, err } - return ref | (p << 40), nil + return ref | (p << 48), nil } func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error { - p := uint8((ref << 16) >> 56) + p := uint8((ref << 8) >> 56) return a.partitions[p].Add(ref, t, v) } diff --git a/head.go b/head.go index 3fc39b854..010ed2b8b 100644 --- a/head.go +++ b/head.go @@ -3,9 +3,9 @@ package tsdb import ( "errors" "math" + "math/rand" "sort" "sync" - "sync/atomic" "time" "github.com/bradfitz/slice" @@ -29,8 +29,6 @@ type headBlock struct { // to their chunk descs. hashes map[uint64][]*memSeries - nextSeriesID uint64 - values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -62,11 +60,10 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { - b.create(uint32(b.nextSeriesID), lset.Hash(), lset) - b.nextSeriesID++ + b.create(lset.Hash(), lset) b.stats.SeriesCount++ }, - sample: func(s hashedSample) { + sample: func(s refdSample) { si := s.ref cd := b.series[si] @@ -112,27 +109,27 @@ func (h *headBlock) Appender() Appender { var headPool = sync.Pool{} -func getHeadAppendBuffer() []hashedSample { +func getHeadAppendBuffer() []refdSample { b := headPool.Get() if b == nil { - return make([]hashedSample, 0, 512) + return make([]refdSample, 0, 512) } - return b.([]hashedSample) + return b.([]refdSample) } -func putHeadAppendBuffer(b []hashedSample) { +func putHeadAppendBuffer(b []refdSample) { headPool.Put(b[:0]) } type headAppender struct { *headBlock - newSeries map[uint32]hashedLabels - newHashes map[uint64]uint32 + newSeries map[uint64]hashedLabels + newHashes map[uint64]uint64 + refmap map[uint64]uint64 newLabels []labels.Labels - newRefs []uint32 - samples []hashedSample + samples []refdSample } type hashedLabels struct { @@ -140,6 +137,12 @@ type hashedLabels struct { labels labels.Labels } +type refdSample struct { + ref uint64 + t int64 + v float64 +} + func (a *headAppender) SetSeries(lset labels.Labels) (uint64, error) { return a.setSeries(lset.Hash(), lset) } @@ -152,35 +155,41 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error return uint64(ref), nil } - id := atomic.AddUint64(&a.nextSeriesID, 1) - 1 - if a.newSeries == nil { - a.newSeries = map[uint32]hashedLabels{} - a.newHashes = map[uint64]uint32{} - } - a.newSeries[uint32(id)] = hashedLabels{hash: hash, labels: lset} - a.newHashes[hash] = uint32(id) - a.newRefs = append(a.newRefs, uint32(id)) + // We only know the actual reference after committing. We generate an + // intermediate reference only valid for this batch. + // It is indicated by the the LSB of the 4th byte being set to 1. + // We use a random ID to avoid collisions when new series are created + // in two subsequent batches. (TODO(fabxc): safe enough?) + ref := uint64(rand.Int31()) | (1 << 32) - return id, nil + if a.newSeries == nil { + a.newSeries = map[uint64]hashedLabels{} + a.newHashes = map[uint64]uint64{} + a.refmap = map[uint64]uint64{} + } + a.newSeries[ref] = hashedLabels{hash: hash, labels: lset} + a.newHashes[hash] = ref + + return ref, nil } func (a *headAppender) Add(ref uint64, t int64, v float64) error { - // We only act on the last 4 bytes. Anything before is used by higher-order - // appenders. We erase it to avoid issues. - ref = (ref << 32) >> 32 + // We only own the first 5 bytes of the reference. Anything before is + // used by higher-order appenders. We erase it to avoid issues. + ref = (ref << 31) >> 31 // Distinguish between existing series and series created in // this transaction. - if int(ref) >= len(a.series) { - if _, ok := a.newSeries[uint32(ref)]; !ok { + if ref&(1<<32) > 0 { + if _, ok := a.newSeries[ref]; !ok { return errNotFound } // TODO(fabxc): we also have to validate here that the // sample sequence is valid. // We also have to revalidate it as we switch locks an create // the new series. - a.samples = append(a.samples, hashedSample{ - ref: uint32(ref), + a.samples = append(a.samples, refdSample{ + ref: ref, t: t, v: v, }) @@ -202,8 +211,8 @@ func (a *headAppender) Add(ref uint64, t int64, v float64) error { return ErrAmendSample } - a.samples = append(a.samples, hashedSample{ - ref: uint32(ref), + a.samples = append(a.samples, refdSample{ + ref: ref, t: t, v: v, }) @@ -215,21 +224,27 @@ func (a *headAppender) createSeries() { return } a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + base0 := len(a.series) a.mtx.RUnlock() a.mtx.Lock() - for _, ref := range a.newRefs { - l := a.newSeries[ref] + base1 := len(a.series) + + for ref, l := range a.newSeries { // We switched locks and have to re-validate that the series were not // created by another goroutine in the meantime. - if int(ref) < len(a.series) && a.series[ref] != nil { - continue + if base1 > base0 { + if ms := a.get(l.hash, l.labels); ms != nil { + a.refmap[ref] = uint64(ms.ref) + continue + } } // Series is still new. a.newLabels = append(a.newLabels, l.labels) + a.refmap[ref] = uint64(len(a.series)) - a.create(ref, l.hash, l.labels) + a.create(l.hash, l.labels) } a.mtx.Unlock() @@ -253,7 +268,12 @@ func (a *headAppender) Commit() error { maxt = int64(math.MinInt64) ) - for _, s := range a.samples { + for i := range a.samples { + s := &a.samples[i] + + if s.ref&(1<<32) > 0 { + s.ref = a.refmap[s.ref] + } if !a.series[s.ref].append(s.t, s.v) { total-- } @@ -401,17 +421,14 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(ref uint32, hash uint64, lset labels.Labels) *memSeries { +func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { s := &memSeries{ - ref: ref, lset: lset, + ref: uint32(len(h.series)), } // Allocate empty space until we can insert at the given index. - for int(ref) >= len(h.series) { - h.series = append(h.series, nil) - } - h.series[ref] = s + h.series = append(h.series, s) h.hashes[hash] = append(h.hashes[hash], s) diff --git a/postings.go b/postings.go index e8a4a4621..b3afe831a 100644 --- a/postings.go +++ b/postings.go @@ -26,23 +26,7 @@ func (p *memPostings) get(t term) Postings { // term argument appears twice. func (p *memPostings) add(id uint32, terms ...term) { for _, t := range terms { - // We expect IDs to roughly be appended in order but some concurrency - // related out of order at the end. We do insertion sort from the end - // to account for it. - l := p.m[t] - i := len(l) - 1 - - for ; i >= 0; i-- { - if id > l[i] { - break - } - } - l = append(l, 0) - - copy(l[i+2:], l[i+1:]) - l[i+1] = id - - p.m[t] = l + p.m[t] = append(p.m[t], id) } } diff --git a/wal.go b/wal.go index b22906e93..1929d58f2 100644 --- a/wal.go +++ b/wal.go @@ -88,7 +88,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error } type walHandler struct { - sample func(hashedSample) + sample func(refdSample) series func(labels.Labels) } @@ -110,7 +110,7 @@ func (w *WAL) ReadAll(h *walHandler) error { } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { +func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { if err := w.enc.encodeSeries(series); err != nil { return err } @@ -268,7 +268,7 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error { return e.entry(WALEntrySeries, walSeriesSimple, buf) } -func (e *walEncoder) encodeSamples(samples []hashedSample) error { +func (e *walEncoder) encodeSamples(samples []refdSample) error { if len(samples) == 0 { return nil } @@ -282,7 +282,7 @@ func (e *walEncoder) encodeSamples(samples []hashedSample) error { // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - binary.BigEndian.PutUint32(b, first.ref) + binary.BigEndian.PutUint64(b, first.ref) buf = append(buf, b[:4]...) binary.BigEndian.PutUint64(b, uint64(first.t)) buf = append(buf, b[:8]...) @@ -351,20 +351,21 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error { return errors.Wrap(errInvalidSize, "header length") } var ( - baseRef = binary.BigEndian.Uint32(b) + baseRef = binary.BigEndian.Uint64(b) baseTime = int64(binary.BigEndian.Uint64(b[4:])) ) b = b[12:] for len(b) > 0 { - var smpl hashedSample + var smpl refdSample dref, n := binary.Varint(b) if n < 1 { return errors.Wrap(errInvalidSize, "sample ref delta") } b = b[n:] - smpl.ref = uint32(int64(baseRef) + dref) + + smpl.ref = uint64(int64(baseRef) + dref) dtime, n := binary.Varint(b) if n < 1 {