diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 8d5663467..491c2ac35 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(metrics, 2000) if err != nil { exitWithError(err) } @@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount type sample struct { labels labels.Labels value int64 - ref *string + ref *uint64 } scrape := make([]*sample, 0, len(metrics)) diff --git a/compact.go b/compact.go index ad9700fac..7d8174f0d 100644 --- a/compact.go +++ b/compact.go @@ -17,6 +17,7 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "sort" "time" @@ -364,6 +365,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) + + // We might have done quite a few allocs. Enforce a GC so they do not accumulate + // with subsequent compactions or head GCs. + runtime.GC() }(time.Now()) dir := filepath.Join(dest, meta.ULID.String()) @@ -477,7 +482,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // We fully rebuild the postings list index from merged series. var ( - postings = &memPostings{m: make(map[term][]uint64, 512)} + postings = newMemPostings() values = map[string]stringset{} i = uint64(0) ) @@ -539,11 +544,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, values[l.Name] = valset } valset.set(l.Value) - - t := term{name: l.Name, value: l.Value} - - postings.add(i, t) } + postings.add(i, lset) + i++ } if set.Err() != nil { @@ -562,8 +565,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for t := range postings.m { - if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { + for l := range postings.m { + if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } } diff --git a/db.go b/db.go index 2e53f7527..724153bff 100644 --- a/db.go +++ b/db.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "sort" "strconv" "sync" @@ -76,11 +75,11 @@ type Appender interface { // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. // If the reference is the empty string it must not be used for caching. - Add(l labels.Labels, t int64, v float64) (string, error) + Add(l labels.Labels, t int64, v float64) (uint64, error) // Add adds a sample pair for the referenced series. It is generally faster // than adding a sample by providing its full label set. - AddFast(ref string, t int64, v float64) error + AddFast(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -350,7 +349,6 @@ func (db *DB) compact() (changes bool, err error) { if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } - runtime.GC() } // Check for compactions of multiple blocks. @@ -383,7 +381,6 @@ func (db *DB) compact() (changes bool, err error) { if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } - runtime.GC() } return changes, nil diff --git a/db_test.go b/db_test.go index 57d885833..fd4389a3a 100644 --- a/db_test.go +++ b/db_test.go @@ -110,27 +110,32 @@ func TestDBAppenderAddRef(t *testing.T) { app1 := db.Appender() - ref, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) + ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) require.NoError(t, err) - // When a series is first created, refs don't work within that transaction. - err = app1.AddFast(ref, 1, 1) - require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) + // Reference should already work before commit. + err = app1.AddFast(ref1, 124, 1) + require.NoError(t, err) err = app1.Commit() require.NoError(t, err) app2 := db.Appender() - ref, err = app2.Add(labels.FromStrings("a", "b"), 133, 1) + + // first ref should already work in next transaction. + err = app2.AddFast(ref1, 125, 0) require.NoError(t, err) + ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1) + require.NoError(t, err) + + require.True(t, ref1 == ref2) + // Reference must be valid to add another sample. - err = app2.AddFast(ref, 143, 2) + err = app2.AddFast(ref2, 143, 2) require.NoError(t, err) - // AddFast for the same timestamp must fail if the generation in the reference - // doesn't add up. - err = app2.AddFast("abc_invalid_xyz", 1, 1) + err = app2.AddFast(9999999, 1, 1) require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) require.NoError(t, app2.Commit()) @@ -141,6 +146,8 @@ func TestDBAppenderAddRef(t *testing.T) { require.Equal(t, map[string][]sample{ labels.FromStrings("a", "b").String(): []sample{ {t: 123, v: 0}, + {t: 124, v: 1}, + {t: 125, v: 0}, {t: 133, v: 1}, {t: 143, v: 2}, }, diff --git a/head.go b/head.go index 1dbfc917d..1759a7d4c 100644 --- a/head.go +++ b/head.go @@ -14,8 +14,8 @@ package tsdb import ( - "encoding/binary" "math" + "runtime" "sort" "sync" "sync/atomic" @@ -48,7 +48,6 @@ var ( // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange int64 - mtx sync.RWMutex metrics *headMetrics wal WAL logger log.Logger @@ -57,16 +56,14 @@ type Head struct { minTime, maxTime int64 lastSeriesID uint64 - // descs holds all chunk descs for the head block. Each chunk implicitly - // is assigned the index as its ID. - series map[uint64]*memSeries - // hashes contains a collision map of label set hashes of chunks - // to their chunk descs. - hashes map[uint64][]*memSeries + // All series addressable by their ID or hash. + series *stripeSeries - symbols map[string]struct{} - values map[string]stringset // label names to possible values - postings *memPostings // postings lists for terms + symMtx sync.RWMutex + symbols map[string]struct{} + values map[string]stringset // label names to possible values + + postings *memPostings // postings lists for terms tombstones tombstoneReader } @@ -178,11 +175,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( chunkRange: chunkRange, minTime: math.MaxInt64, maxTime: math.MinInt64, - series: map[uint64]*memSeries{}, - hashes: map[uint64][]*memSeries{}, + series: newStripeSeries(), values: map[string]stringset{}, symbols: map[string]struct{}{}, - postings: &memPostings{m: make(map[term][]uint64)}, + postings: newMemPostings(), tombstones: newEmptyTombstoneReader(), } h.metrics = newHeadMetrics(h, r) @@ -201,8 +197,8 @@ func (h *Head) readWAL() error { } samplesFunc := func(samples []RefSample) error { for _, s := range samples { - ms, ok := h.series[s.Ref] - if !ok { + ms := h.series.getByID(s.Ref) + if ms == nil { return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) } _, chunkCreated := ms.append(s.T, s.V) @@ -291,7 +287,7 @@ type initAppender struct { head *Head } -func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { +func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if a.app != nil { return a.app.Add(lset, t, v) } @@ -301,7 +297,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, erro return a.app.Add(lset, t, v) } -func (a *initAppender) AddFast(ref string, t int64, v float64) error { +func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { if a.app == nil { return ErrNotFound } @@ -335,8 +331,6 @@ func (h *Head) Appender() Appender { } func (h *Head) appender() *headAppender { - h.mtx.RLock() - return &headAppender{ head: h, mint: h.MaxTime() - h.chunkRange/2, @@ -361,177 +355,71 @@ type headAppender struct { head *Head mint int64 - newSeries []*hashedLabels - createdSeries []RefSeries - newHashes map[uint64]uint64 - + series []RefSeries samples []RefSample highTimestamp int64 } -type hashedLabels struct { - ref uint64 - hash uint64 - labels labels.Labels -} - -func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { +func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.mint { - return "", ErrOutOfBounds + return 0, ErrOutOfBounds } - hash := lset.Hash() - refb := make([]byte, 8) - // Series exists already in the block. - if ms := a.head.get(hash, lset); ms != nil { - binary.BigEndian.PutUint64(refb, uint64(ms.ref)) - return string(refb), a.AddFast(string(refb), t, v) + s := a.head.series.getByHash(hash, lset) + + if s == nil { + s = a.head.create(hash, lset) + + a.series = append(a.series, RefSeries{ + Ref: s.ref, + Labels: lset, + hash: hash, + }) } - // Series was added in this transaction previously. - if ref, ok := a.newHashes[hash]; ok { - binary.BigEndian.PutUint64(refb, ref) - // XXX(fabxc): there's no fast path for multiple samples for the same new series - // in the same transaction. We always return the invalid empty ref. It's has not - // been a relevant use case so far and is not worth the trouble. - return "", a.AddFast(string(refb), t, v) - } - - // The series is completely new. - if a.newSeries == nil { - a.newHashes = map[uint64]uint64{} - } - // First sample for new series. - ref := uint64(len(a.newSeries)) - - a.newSeries = append(a.newSeries, &hashedLabels{ - ref: ref, - hash: hash, - labels: lset, - }) - // First bit indicates its a series created in this transaction. - ref |= (1 << 63) - - a.newHashes[hash] = ref - binary.BigEndian.PutUint64(refb, ref) - - return "", a.AddFast(string(refb), t, v) + return s.ref, a.AddFast(s.ref, t, v) } -func (a *headAppender) AddFast(ref string, t int64, v float64) error { - if len(ref) != 8 { - return errors.Wrap(ErrNotFound, "invalid ref length") +func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { + s := a.head.series.getByID(ref) + + if s == nil { + return errors.Wrap(ErrNotFound, "unknown series") } - var ( - refn = binary.BigEndian.Uint64(yoloBytes(ref)) - id = (refn << 1) >> 1 - inTx = refn&(1<<63) != 0 - ) - // Distinguish between existing series and series created in - // this transaction. - if inTx { - if id > uint64(len(a.newSeries)-1) { - return errors.Wrap(ErrNotFound, "transaction series ID too high") - } - // TODO(fabxc): we also have to validate here that the - // sample sequence is valid. - // We also have to revalidate it as we switch locks and create - // the new series. - } else { - ms, ok := a.head.series[id] - if !ok { - return errors.Wrap(ErrNotFound, "unknown series") - } - if err := ms.appendable(t, v); err != nil { - return err - } + if err := s.appendable(t, v); err != nil { + return err } + if t < a.mint { return ErrOutOfBounds } - if t > a.highTimestamp { a.highTimestamp = t } a.samples = append(a.samples, RefSample{ - Ref: refn, - T: t, - V: v, + Ref: ref, + T: t, + V: v, + series: s, }) return nil } -func (a *headAppender) createSeries() error { - if len(a.newSeries) == 0 { - return nil - } - a.createdSeries = make([]RefSeries, 0, len(a.newSeries)) - base0 := len(a.head.series) - - a.head.mtx.RUnlock() - defer a.head.mtx.RLock() - a.head.mtx.Lock() - defer a.head.mtx.Unlock() - - base1 := len(a.head.series) - - for _, l := range a.newSeries { - // We switched locks and have to re-validate that the series were not - // created by another goroutine in the meantime. - if base1 > base0 { - if ms := a.head.get(l.hash, l.labels); ms != nil { - l.ref = uint64(ms.ref) - continue - } - } - // Series is still new. - s := a.head.create(l.hash, l.labels) - l.ref = uint64(s.ref) - - a.createdSeries = append(a.createdSeries, RefSeries{Ref: l.ref, Labels: l.labels}) - } - - // Write all new series to the WAL. - if err := a.head.wal.LogSeries(a.createdSeries); err != nil { - return errors.Wrap(err, "WAL log series") - } - - return nil -} - func (a *headAppender) Commit() error { - defer a.head.mtx.RUnlock() + defer a.Rollback() - defer a.head.metrics.activeAppenders.Dec() - defer a.head.putAppendBuffer(a.samples) - - if err := a.createSeries(); err != nil { + if err := a.head.wal.LogSeries(a.series); err != nil { return err } - - // We have to update the refs of samples for series we just created. - for i := range a.samples { - s := &a.samples[i] - if s.Ref&(1<<63) != 0 { - s.Ref = a.newSeries[(s.Ref<<1)>>1].ref - } - } - - // Write all new samples to the WAL and add them to the - // in-mem database on success. if err := a.head.wal.LogSamples(a.samples); err != nil { return errors.Wrap(err, "WAL log samples") } - total := uint64(len(a.samples)) + total := len(a.samples) for _, s := range a.samples { - series, ok := a.head.series[s.Ref] - if !ok { - return errors.Errorf("series with ID %d not found", s.Ref) - } - ok, chunkCreated := series.append(s.T, s.V) + ok, chunkCreated := s.series.append(s.T, s.V) if !ok { total-- } @@ -557,8 +445,6 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { - a.head.mtx.RUnlock() - a.head.metrics.activeAppenders.Dec() a.head.putAppendBuffer(a.samples) @@ -580,7 +466,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { Outer: for p.Next() { - series := h.series[p.At()] + series := h.series.getByID(p.At()) for _, abs := range absent { if series.lset.Get(abs) != "" { @@ -607,111 +493,98 @@ Outer: // gc removes data before the minimum timestmap from the head. func (h *Head) gc() { - var ( - seriesRemoved int - chunksRemoved int - ) + defer runtime.GC() + // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() - deletedHashes := map[uint64][]uint64{} - - h.mtx.RLock() - - for hash, ss := range h.hashes { - for _, s := range ss { - s.mtx.Lock() - chunksRemoved += s.truncateChunksBefore(mint) - - if len(s.chunks) == 0 { - deletedHashes[hash] = append(deletedHashes[hash], s.ref) - } - s.mtx.Unlock() - } - } - - deletedIDs := make(map[uint64]struct{}, len(deletedHashes)) - - h.mtx.RUnlock() - - h.mtx.Lock() - defer h.mtx.Unlock() - - for hash, ids := range deletedHashes { - - inIDs := func(id uint64) bool { - for _, o := range ids { - if o == id { - return true - } - } - return false - } - var rem []*memSeries - - for _, s := range h.hashes[hash] { - if !inIDs(s.ref) { - rem = append(rem, s) - continue - } - deletedIDs[s.ref] = struct{}{} - // We switched locks and the series might have received new samples by now, - // check again. - s.mtx.Lock() - chkCount := len(s.chunks) - s.mtx.Unlock() - - if chkCount > 0 { - continue - } - delete(h.series, s.ref) - seriesRemoved++ - } - if len(rem) > 0 { - h.hashes[hash] = rem - } else { - delete(h.hashes, hash) - } - } - - for t, p := range h.postings.m { - repl := make([]uint64, 0, len(p)) - - for _, id := range p { - if _, ok := deletedIDs[id]; !ok { - repl = append(repl, id) - } - } - - if len(repl) == 0 { - delete(h.postings.m, t) - } else { - h.postings.m[t] = repl - } - } - - symbols := make(map[string]struct{}, len(h.symbols)) - values := make(map[string]stringset, len(h.values)) - - for t := range h.postings.m { - symbols[t.name] = struct{}{} - symbols[t.value] = struct{}{} - - ss, ok := values[t.name] - if !ok { - ss = stringset{} - values[t.name] = ss - } - ss.set(t.value) - } - - h.symbols = symbols - h.values = values + // Drop old chunks and remember series IDs and hashes if they can be + // deleted entirely. + deleted, chunksRemoved := h.series.gc(mint) + seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.series.Sub(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) + + // Remove deleted series IDs from the postings lists. First do a collection + // run where we rebuild all postings that have something to delete + h.postings.mtx.RLock() + + type replEntry struct { + idx int + l []uint64 + } + collected := map[labels.Label]replEntry{} + + for t, p := range h.postings.m { + repl := replEntry{idx: len(p)} + + for i, id := range p { + if _, ok := deleted[id]; ok { + // First ID that got deleted, initialize replacement with + // all remaining IDs so far. + if repl.l == nil { + repl.l = make([]uint64, 0, len(p)) + repl.l = append(repl.l, p[:i]...) + } + continue + } + // Only add to the replacement once we know we have to do it. + if repl.l != nil { + repl.l = append(repl.l, id) + } + } + if repl.l != nil { + collected[t] = repl + } + } + + h.postings.mtx.RUnlock() + + // Replace all postings that have changed. Append all IDs that may have + // been added while we switched locks. + h.postings.mtx.Lock() + + for t, repl := range collected { + l := append(repl.l, h.postings.m[t][repl.idx:]...) + + if len(l) > 0 { + h.postings.m[t] = l + } else { + delete(h.postings.m, t) + } + } + + h.postings.mtx.Unlock() + + // Rebuild symbols and label value indices from what is left in the postings terms. + h.postings.mtx.RLock() + + symbols := make(map[string]struct{}, len(h.symbols)) + values := make(map[string]stringset, len(h.values)) + + for t := range h.postings.m { + symbols[t.Name] = struct{}{} + symbols[t.Value] = struct{}{} + + ss, ok := values[t.Name] + if !ok { + ss = stringset{} + values[t.Name] = ss + } + ss.set(t.Value) + } + + h.postings.mtx.RUnlock() + + h.symMtx.Lock() + + h.symbols = symbols + h.values = values + + h.symMtx.Unlock() } func (h *Head) Tombstones() TombstoneReader { @@ -779,11 +652,9 @@ func unpackChunkID(id uint64) (seriesID, chunkID uint64) { // Chunk returns the chunk for the reference number. func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() - sid, cid := unpackChunkID(ref) - s := h.head.series[sid] + + s := h.head.series.getByID(sid) s.mtx.RLock() c := s.chunk(int(cid)) @@ -843,19 +714,27 @@ func (h *headIndexReader) Close() error { } func (h *headIndexReader) Symbols() (map[string]struct{}, error) { - return h.head.symbols, nil + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() + + res := make(map[string]struct{}, len(h.head.symbols)) + + for s := range h.head.symbols { + res[s] = struct{}{} + } + return res, nil } // LabelValues returns the possible label values func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() - if len(names) != 1 { return nil, errInvalidSize } var sl []string + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() + for s := range h.head.values[names[0]] { sl = append(sl, s) } @@ -866,17 +745,11 @@ func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { // Postings returns the postings list iterator for the label pair. func (h *headIndexReader) Postings(name, value string) (Postings, error) { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() - - return h.head.postings.get(term{name: name, value: value}), nil + return h.head.postings.get(name, value), nil } func (h *headIndexReader) SortedPostings(p Postings) Postings { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() - - ep := make([]uint64, 0, 1024) + ep := make([]uint64, 0, 128) for p.Next() { ep = append(ep, p.At()) @@ -890,10 +763,10 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { if err != nil { return false } - a, ok1 := h.head.series[ep[i]] - b, ok2 := h.head.series[ep[j]] + a := h.head.series.getByID(ep[i]) + b := h.head.series.getByID(ep[j]) - if !ok1 || !ok2 { + if a == nil || b == nil { err = errors.Errorf("series not found") return false } @@ -907,10 +780,8 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { // Series returns the series for the given reference. func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() + s := h.head.series.getByID(ref) - s := h.head.series[ref] if s == nil { return ErrNotFound } @@ -937,8 +808,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } func (h *headIndexReader) LabelIndices() ([][]string, error) { - h.head.mtx.RLock() - defer h.head.mtx.RUnlock() + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() res := [][]string{} @@ -948,29 +819,24 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { return res, nil } -// get retrieves the chunk with the hash and label set and creates -// a new one if it doesn't exist yet. -func (h *Head) get(hash uint64, lset labels.Labels) *memSeries { - series := h.hashes[hash] - - for _, s := range series { - if s.lset.Equals(lset) { - return s - } - } - return nil -} - func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { h.metrics.series.Inc() h.metrics.seriesCreated.Inc() + // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) - s := newMemSeries(lset, id, h.chunkRange) - h.series[id] = s - h.hashes[hash] = append(h.hashes[hash], s) + s, created := h.series.getOrSet(hash, s) + // Skip indexing if we didn't actually create the series. + if !created { + return s + } + + h.postings.add(id, lset) + + h.symMtx.Lock() + defer h.symMtx.Unlock() for _, l := range lset { valset, ok := h.values[l.Name] @@ -980,17 +846,179 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { } valset.set(l.Value) - h.postings.add(s.ref, term{name: l.Name, value: l.Value}) - h.symbols[l.Name] = struct{}{} h.symbols[l.Value] = struct{}{} } - h.postings.add(id, term{}) - return s } +// seriesHashmap is a simple hashmap for memSeries by their label set. It is built +// on top of a regular hashmap and holds a slice of series to resolve hash collisions. +// Its methods require the hash to be submitted with it to avoid re-computations throughout +// the code. +type seriesHashmap map[uint64][]*memSeries + +func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { + for _, s := range m[hash] { + if s.lset.Equals(lset) { + return s + } + } + return nil +} + +func (m seriesHashmap) set(hash uint64, s *memSeries) { + l := m[hash] + for i, prev := range l { + if prev.lset.Equals(s.lset) { + l[i] = s + return + } + } + m[hash] = append(l, s) +} + +func (m seriesHashmap) del(hash uint64, lset labels.Labels) { + var rem []*memSeries + for _, s := range m[hash] { + if !s.lset.Equals(lset) { + rem = append(rem, s) + } + } + if len(rem) == 0 { + delete(m, hash) + } else { + m[hash] = rem + } +} + +// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. +// The locks are padded to not be on the same cache line. Filling the badded space +// with the maps was profiled to be slower – likely due to the additional pointer +// dereferences. +type stripeSeries struct { + series [stripeSize]map[uint64]*memSeries + hashes [stripeSize]seriesHashmap + locks [stripeSize]stripeLock +} + +const ( + stripeSize = 1 << 14 + stripeMask = stripeSize - 1 +) + +type stripeLock struct { + sync.RWMutex + // Padding to avoid multiple locks being on the same cache line. + _ [40]byte +} + +func newStripeSeries() *stripeSeries { + s := &stripeSeries{} + + for i := range s.series { + s.series[i] = map[uint64]*memSeries{} + } + for i := range s.hashes { + s.hashes[i] = seriesHashmap{} + } + return s +} + +// gc garbage collects old chunks that are strictly before mint and removes +// series entirely that have no chunks left. +func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { + var ( + deleted = map[uint64]struct{}{} + rmChunks = 0 + ) + // Run through all series and truncate old chunks. Mark those with no + // chunks left as deleted and store their ID and hash. + for i := 0; i < stripeSize; i++ { + s.locks[i].Lock() + + for hash, all := range s.hashes[i] { + for _, series := range all { + series.mtx.Lock() + rmChunks += series.truncateChunksBefore(mint) + + if len(series.chunks) > 0 { + series.mtx.Unlock() + continue + } + + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + j := int(series.ref & stripeMask) + + if i != j { + s.locks[j].Lock() + } + + deleted[series.ref] = struct{}{} + s.hashes[i].del(hash, series.lset) + delete(s.series[j], series.ref) + + if i != j { + s.locks[j].Unlock() + } + + series.mtx.Unlock() + } + } + + s.locks[i].Unlock() + } + + return deleted, rmChunks +} + +func (s *stripeSeries) getByID(id uint64) *memSeries { + i := id & stripeMask + + s.locks[i].RLock() + series := s.series[i][id] + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { + i := hash & stripeMask + + s.locks[i].RLock() + series := s.hashes[i].get(hash, lset) + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) { + i := hash & stripeMask + + s.locks[i].Lock() + + if prev := s.hashes[i].get(hash, series.lset); prev != nil { + return prev, false + } + s.hashes[i].set(hash, series) + + s.hashes[i][hash] = append(s.hashes[i][hash], series) + s.locks[i].Unlock() + + i = series.ref & stripeMask + + s.locks[i].Lock() + s.series[i][series.ref] = series + s.locks[i].Unlock() + + return series, true +} + type sample struct { t int64 v float64 diff --git a/head_test.go b/head_test.go index 50aa80e6e..724dab224 100644 --- a/head_test.go +++ b/head_test.go @@ -117,22 +117,22 @@ func TestHead_Truncate(t *testing.T) { require.Equal(t, []*memChunk{ {minTime: 2000, maxTime: 2999}, - }, h.series[s1.ref].chunks) + }, h.series.getByID(s1.ref).chunks) require.Equal(t, []*memChunk{ {minTime: 2000, maxTime: 2999}, {minTime: 3000, maxTime: 3999}, - }, h.series[s2.ref].chunks) + }, h.series.getByID(s2.ref).chunks) - require.Nil(t, h.series[s3.ref]) - require.Nil(t, h.series[s4.ref]) + require.Nil(t, h.series.getByID(s3.ref)) + require.Nil(t, h.series.getByID(s4.ref)) - postingsA1, _ := expandPostings(h.postings.get(term{"a", "1"})) - postingsA2, _ := expandPostings(h.postings.get(term{"a", "2"})) - postingsB1, _ := expandPostings(h.postings.get(term{"b", "1"})) - postingsB2, _ := expandPostings(h.postings.get(term{"b", "2"})) - postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"})) - postingsAll, _ := expandPostings(h.postings.get(term{"", ""})) + postingsA1, _ := expandPostings(h.postings.get("a", "1")) + postingsA2, _ := expandPostings(h.postings.get("a", "2")) + postingsB1, _ := expandPostings(h.postings.get("b", "1")) + postingsB2, _ := expandPostings(h.postings.get("b", "2")) + postingsC1, _ := expandPostings(h.postings.get("c", "1")) + postingsAll, _ := expandPostings(h.postings.get("", "")) require.Equal(t, []uint64{s1.ref}, postingsA1) require.Equal(t, []uint64{s2.ref}, postingsA2) @@ -517,7 +517,7 @@ func boundedSamples(full []sample, mint, maxt int64) []sample { full = full[1:] } for i, s := range full { - // Terminate on the first sample larger than maxt. + // labels.Labelinate on the first sample larger than maxt. if s.t > maxt { return full[:i] } diff --git a/index_test.go b/index_test.go index 9f3e14faa..12329b560 100644 --- a/index_test.go +++ b/index_test.go @@ -43,7 +43,7 @@ func newMockIndex() mockIndex { return mockIndex{ series: make(map[uint64]series), labelIndex: make(map[string][]string), - postings: &memPostings{m: make(map[term][]uint64)}, + postings: newMemPostings(), symbols: make(map[string]struct{}), } } @@ -84,14 +84,14 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error { } func (m mockIndex) WritePostings(name, value string, it Postings) error { - if _, ok := m.postings.m[term{name, value}]; ok { + if _, ok := m.postings.m[labels.Label{name, value}]; ok { return errors.Errorf("postings for %s=%q already added", name, value) } ep, err := expandPostings(it) if err != nil { return err } - m.postings.m[term{name, value}] = ep + m.postings.m[labels.Label{name, value}] = ep return it.Err() } @@ -110,7 +110,7 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) { } func (m mockIndex) Postings(name, value string) (Postings, error) { - return m.postings.get(term{name, value}), nil + return m.postings.get(name, value), nil } func (m mockIndex) SortedPostings(p Postings) Postings { @@ -274,7 +274,7 @@ func TestPersistence_index_e2e(t *testing.T) { // Population procedure as done by compaction. var ( - postings = &memPostings{m: make(map[term][]uint64, 512)} + postings = newMemPostings() values = map[string]stringset{} ) @@ -292,9 +292,8 @@ func TestPersistence_index_e2e(t *testing.T) { values[l.Name] = valset } valset.set(l.Value) - - postings.add(uint64(i), term{name: l.Name, value: l.Value}) } + postings.add(uint64(i), s.labels) i++ } @@ -313,10 +312,10 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, err) mi.WritePostings("", "", newListPostings(all)) - for tm := range postings.m { - err = iw.WritePostings(tm.name, tm.value, postings.get(tm)) + for l := range postings.m { + err = iw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)) require.NoError(t, err) - mi.WritePostings(tm.name, tm.value, postings.get(tm)) + mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)) } err = iw.Close() @@ -326,10 +325,10 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, err) for p := range mi.postings.m { - gotp, err := ir.Postings(p.name, p.value) + gotp, err := ir.Postings(p.Name, p.Value) require.NoError(t, err) - expp, err := mi.Postings(p.name, p.value) + expp, err := mi.Postings(p.Name, p.Value) var lset, explset labels.Labels var chks, expchks []ChunkMeta diff --git a/labels/labels_test.go b/labels/labels_test.go index 9a613b681..e08c8e459 100644 --- a/labels/labels_test.go +++ b/labels/labels_test.go @@ -218,3 +218,25 @@ func BenchmarkLabelSetEquals(b *testing.B) { } _ = res } + +func BenchmarkLabelSetHash(b *testing.B) { + // The vast majority of comparisons will be against a matching label set. + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + ls := FromMap(m) + var res uint64 + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res += ls.Hash() + } + fmt.Println(res) +} diff --git a/postings.go b/postings.go index e3ca75de9..97a29ab19 100644 --- a/postings.go +++ b/postings.go @@ -17,31 +17,47 @@ import ( "encoding/binary" "sort" "strings" + "sync" + + "github.com/prometheus/tsdb/labels" ) type memPostings struct { - m map[term][]uint64 + mtx sync.RWMutex + m map[labels.Label][]uint64 } -type term struct { - name, value string +func newMemPostings() *memPostings { + return &memPostings{ + m: make(map[labels.Label][]uint64, 512), + } } // Postings returns an iterator over the postings list for s. -func (p *memPostings) get(t term) Postings { - l := p.m[t] +func (p *memPostings) get(name, value string) Postings { + p.mtx.RLock() + l := p.m[labels.Label{Name: name, Value: value}] + p.mtx.RUnlock() + if l == nil { return emptyPostings } return newListPostings(l) } +var allLabel = labels.Label{} + // add adds a document to the index. The caller has to ensure that no // term argument appears twice. -func (p *memPostings) add(id uint64, terms ...term) { - for _, t := range terms { - p.m[t] = append(p.m[t], id) +func (p *memPostings) add(id uint64, lset labels.Labels) { + p.mtx.Lock() + + for _, l := range lset { + p.m[l] = append(p.m[l], id) } + p.m[allLabel] = append(p.m[allLabel], id) + + p.mtx.Unlock() } // Postings provides iterative access over a postings list. diff --git a/querier_test.go b/querier_test.go index 3f243ad69..e46f3b947 100644 --- a/querier_test.go +++ b/querier_test.go @@ -228,7 +228,7 @@ func createIdxChkReaders(tc []struct { return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 }) - postings := &memPostings{m: make(map[term][]uint64, 512)} + postings := newMemPostings() chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) lblIdx := make(map[string]stringset) mi := newMockIndex() @@ -257,10 +257,9 @@ func createIdxChkReaders(tc []struct { ls := labels.FromMap(s.lset) mi.AddSeries(uint64(i), ls, metas...) - postings.add(uint64(i), term{}) - for _, l := range ls { - postings.add(uint64(i), term{l.Name, l.Value}) + postings.add(uint64(i), ls) + for _, l := range ls { vs, present := lblIdx[l.Name] if !present { vs = stringset{} @@ -274,8 +273,8 @@ func createIdxChkReaders(tc []struct { mi.WriteLabelIndex([]string{l}, vs.slice()) } - for tm := range postings.m { - mi.WritePostings(tm.name, tm.value, postings.get(tm)) + for l := range postings.m { + mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)) } return mi, chkReader diff --git a/test/hash_test.go b/test/hash_test.go index b5f57a5a8..fa448b4c6 100644 --- a/test/hash_test.go +++ b/test/hash_test.go @@ -60,7 +60,6 @@ func BenchmarkHash(b *testing.B) { } }) } - } // hashAdd adds a string to a fnv64a hash value, returning the updated hash. diff --git a/wal.go b/wal.go index 2af7d542a..51f52e876 100644 --- a/wal.go +++ b/wal.go @@ -111,6 +111,9 @@ type WALReader interface { type RefSeries struct { Ref uint64 Labels labels.Labels + + // hash for the label set. This field is not generally populated. + hash uint64 } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -118,6 +121,8 @@ type RefSample struct { Ref uint64 T int64 V float64 + + series *memSeries } type segmentFile struct {