Replace single head lock with granular locks

This adds various new locks to replace the single big lock on
the head. All parts now must be COW as they may be held by clients
after initial retrieval.
Series by ID and hashes are now held in a stripe lock to reduce
contention and total holding time during GC. This should reduce
starvation of readers.
This commit is contained in:
Fabian Reinartz 2017-09-05 11:45:18 +02:00
parent 1ddedf2b30
commit c36d574290
12 changed files with 436 additions and 361 deletions

View file

@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dur := measureTime("ingestScrapes", func() { dur := measureTime("ingestScrapes", func() {
b.startProfiling() b.startProfiling()
total, err = b.ingestScrapes(metrics, 3000) total, err = b.ingestScrapes(metrics, 2000)
if err != nil { if err != nil {
exitWithError(err) exitWithError(err)
} }
@ -199,7 +199,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
type sample struct { type sample struct {
labels labels.Labels labels labels.Labels
value int64 value int64
ref *string ref *uint64
} }
scrape := make([]*sample, 0, len(metrics)) scrape := make([]*sample, 0, len(metrics))

View file

@ -17,6 +17,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"time" "time"
@ -364,6 +365,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
c.metrics.ran.Inc() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds()) c.metrics.duration.Observe(time.Since(t).Seconds())
// We might have done quite a few allocs. Enforce a GC so they do not accumulate
// with subsequent compactions or head GCs.
runtime.GC()
}(time.Now()) }(time.Now())
dir := filepath.Join(dest, meta.ULID.String()) dir := filepath.Join(dest, meta.ULID.String())
@ -477,7 +482,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
// We fully rebuild the postings list index from merged series. // We fully rebuild the postings list index from merged series.
var ( var (
postings = &memPostings{m: make(map[term][]uint64, 512)} postings = newMemPostings()
values = map[string]stringset{} values = map[string]stringset{}
i = uint64(0) i = uint64(0)
) )
@ -539,11 +544,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
values[l.Name] = valset values[l.Name] = valset
} }
valset.set(l.Value) valset.set(l.Value)
t := term{name: l.Name, value: l.Value}
postings.add(i, t)
} }
postings.add(i, lset)
i++ i++
} }
if set.Err() != nil { if set.Err() != nil {
@ -562,8 +565,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
} }
for t := range postings.m { for l := range postings.m {
if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }
} }

7
db.go
View file

@ -21,7 +21,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -76,11 +75,11 @@ type Appender interface {
// to AddFast() at any point. Adding the sample via Add() returns a new // to AddFast() at any point. Adding the sample via Add() returns a new
// reference number. // reference number.
// If the reference is the empty string it must not be used for caching. // If the reference is the empty string it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (string, error) Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. It is generally faster // Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set. // than adding a sample by providing its full label set.
AddFast(ref string, t int64, v float64) error AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
Commit() error Commit() error
@ -350,7 +349,6 @@ func (db *DB) compact() (changes bool, err error) {
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks") return changes, errors.Wrap(err, "reload blocks")
} }
runtime.GC()
} }
// Check for compactions of multiple blocks. // Check for compactions of multiple blocks.
@ -383,7 +381,6 @@ func (db *DB) compact() (changes bool, err error) {
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks") return changes, errors.Wrap(err, "reload blocks")
} }
runtime.GC()
} }
return changes, nil return changes, nil

View file

@ -110,27 +110,32 @@ func TestDBAppenderAddRef(t *testing.T) {
app1 := db.Appender() app1 := db.Appender()
ref, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
require.NoError(t, err) require.NoError(t, err)
// When a series is first created, refs don't work within that transaction. // Reference should already work before commit.
err = app1.AddFast(ref, 1, 1) err = app1.AddFast(ref1, 124, 1)
require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) require.NoError(t, err)
err = app1.Commit() err = app1.Commit()
require.NoError(t, err) require.NoError(t, err)
app2 := db.Appender() app2 := db.Appender()
ref, err = app2.Add(labels.FromStrings("a", "b"), 133, 1)
// first ref should already work in next transaction.
err = app2.AddFast(ref1, 125, 0)
require.NoError(t, err) require.NoError(t, err)
ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1)
require.NoError(t, err)
require.True(t, ref1 == ref2)
// Reference must be valid to add another sample. // Reference must be valid to add another sample.
err = app2.AddFast(ref, 143, 2) err = app2.AddFast(ref2, 143, 2)
require.NoError(t, err) require.NoError(t, err)
// AddFast for the same timestamp must fail if the generation in the reference err = app2.AddFast(9999999, 1, 1)
// doesn't add up.
err = app2.AddFast("abc_invalid_xyz", 1, 1)
require.EqualError(t, errors.Cause(err), ErrNotFound.Error()) require.EqualError(t, errors.Cause(err), ErrNotFound.Error())
require.NoError(t, app2.Commit()) require.NoError(t, app2.Commit())
@ -141,6 +146,8 @@ func TestDBAppenderAddRef(t *testing.T) {
require.Equal(t, map[string][]sample{ require.Equal(t, map[string][]sample{
labels.FromStrings("a", "b").String(): []sample{ labels.FromStrings("a", "b").String(): []sample{
{t: 123, v: 0}, {t: 123, v: 0},
{t: 124, v: 1},
{t: 125, v: 0},
{t: 133, v: 1}, {t: 133, v: 1},
{t: 143, v: 2}, {t: 143, v: 2},
}, },

628
head.go
View file

@ -14,8 +14,8 @@
package tsdb package tsdb
import ( import (
"encoding/binary"
"math" "math"
"runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -48,7 +48,6 @@ var (
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
type Head struct { type Head struct {
chunkRange int64 chunkRange int64
mtx sync.RWMutex
metrics *headMetrics metrics *headMetrics
wal WAL wal WAL
logger log.Logger logger log.Logger
@ -57,16 +56,14 @@ type Head struct {
minTime, maxTime int64 minTime, maxTime int64
lastSeriesID uint64 lastSeriesID uint64
// descs holds all chunk descs for the head block. Each chunk implicitly // All series addressable by their ID or hash.
// is assigned the index as its ID. series *stripeSeries
series map[uint64]*memSeries
// hashes contains a collision map of label set hashes of chunks
// to their chunk descs.
hashes map[uint64][]*memSeries
symbols map[string]struct{} symMtx sync.RWMutex
values map[string]stringset // label names to possible values symbols map[string]struct{}
postings *memPostings // postings lists for terms values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms
tombstones tombstoneReader tombstones tombstoneReader
} }
@ -178,11 +175,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
chunkRange: chunkRange, chunkRange: chunkRange,
minTime: math.MaxInt64, minTime: math.MaxInt64,
maxTime: math.MinInt64, maxTime: math.MinInt64,
series: map[uint64]*memSeries{}, series: newStripeSeries(),
hashes: map[uint64][]*memSeries{},
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: &memPostings{m: make(map[term][]uint64)}, postings: newMemPostings(),
tombstones: newEmptyTombstoneReader(), tombstones: newEmptyTombstoneReader(),
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -201,8 +197,8 @@ func (h *Head) readWAL() error {
} }
samplesFunc := func(samples []RefSample) error { samplesFunc := func(samples []RefSample) error {
for _, s := range samples { for _, s := range samples {
ms, ok := h.series[s.Ref] ms := h.series.getByID(s.Ref)
if !ok { if ms == nil {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
} }
_, chunkCreated := ms.append(s.T, s.V) _, chunkCreated := ms.append(s.T, s.V)
@ -291,7 +287,7 @@ type initAppender struct {
head *Head head *Head
} }
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if a.app != nil { if a.app != nil {
return a.app.Add(lset, t, v) return a.app.Add(lset, t, v)
} }
@ -301,7 +297,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, erro
return a.app.Add(lset, t, v) return a.app.Add(lset, t, v)
} }
func (a *initAppender) AddFast(ref string, t int64, v float64) error { func (a *initAppender) AddFast(ref uint64, t int64, v float64) error {
if a.app == nil { if a.app == nil {
return ErrNotFound return ErrNotFound
} }
@ -335,8 +331,6 @@ func (h *Head) Appender() Appender {
} }
func (h *Head) appender() *headAppender { func (h *Head) appender() *headAppender {
h.mtx.RLock()
return &headAppender{ return &headAppender{
head: h, head: h,
mint: h.MaxTime() - h.chunkRange/2, mint: h.MaxTime() - h.chunkRange/2,
@ -361,177 +355,71 @@ type headAppender struct {
head *Head head *Head
mint int64 mint int64
newSeries []*hashedLabels series []RefSeries
createdSeries []RefSeries
newHashes map[uint64]uint64
samples []RefSample samples []RefSample
highTimestamp int64 highTimestamp int64
} }
type hashedLabels struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
ref uint64
hash uint64
labels labels.Labels
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if t < a.mint { if t < a.mint {
return "", ErrOutOfBounds return 0, ErrOutOfBounds
} }
hash := lset.Hash() hash := lset.Hash()
refb := make([]byte, 8)
// Series exists already in the block. s := a.head.series.getByHash(hash, lset)
if ms := a.head.get(hash, lset); ms != nil {
binary.BigEndian.PutUint64(refb, uint64(ms.ref)) if s == nil {
return string(refb), a.AddFast(string(refb), t, v) s = a.head.create(hash, lset)
a.series = append(a.series, RefSeries{
Ref: s.ref,
Labels: lset,
hash: hash,
})
} }
// Series was added in this transaction previously. return s.ref, a.AddFast(s.ref, t, v)
if ref, ok := a.newHashes[hash]; ok {
binary.BigEndian.PutUint64(refb, ref)
// XXX(fabxc): there's no fast path for multiple samples for the same new series
// in the same transaction. We always return the invalid empty ref. It's has not
// been a relevant use case so far and is not worth the trouble.
return "", a.AddFast(string(refb), t, v)
}
// The series is completely new.
if a.newSeries == nil {
a.newHashes = map[uint64]uint64{}
}
// First sample for new series.
ref := uint64(len(a.newSeries))
a.newSeries = append(a.newSeries, &hashedLabels{
ref: ref,
hash: hash,
labels: lset,
})
// First bit indicates its a series created in this transaction.
ref |= (1 << 63)
a.newHashes[hash] = ref
binary.BigEndian.PutUint64(refb, ref)
return "", a.AddFast(string(refb), t, v)
} }
func (a *headAppender) AddFast(ref string, t int64, v float64) error { func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if len(ref) != 8 { s := a.head.series.getByID(ref)
return errors.Wrap(ErrNotFound, "invalid ref length")
if s == nil {
return errors.Wrap(ErrNotFound, "unknown series")
} }
var ( if err := s.appendable(t, v); err != nil {
refn = binary.BigEndian.Uint64(yoloBytes(ref)) return err
id = (refn << 1) >> 1
inTx = refn&(1<<63) != 0
)
// Distinguish between existing series and series created in
// this transaction.
if inTx {
if id > uint64(len(a.newSeries)-1) {
return errors.Wrap(ErrNotFound, "transaction series ID too high")
}
// TODO(fabxc): we also have to validate here that the
// sample sequence is valid.
// We also have to revalidate it as we switch locks and create
// the new series.
} else {
ms, ok := a.head.series[id]
if !ok {
return errors.Wrap(ErrNotFound, "unknown series")
}
if err := ms.appendable(t, v); err != nil {
return err
}
} }
if t < a.mint { if t < a.mint {
return ErrOutOfBounds return ErrOutOfBounds
} }
if t > a.highTimestamp { if t > a.highTimestamp {
a.highTimestamp = t a.highTimestamp = t
} }
a.samples = append(a.samples, RefSample{ a.samples = append(a.samples, RefSample{
Ref: refn, Ref: ref,
T: t, T: t,
V: v, V: v,
series: s,
}) })
return nil return nil
} }
func (a *headAppender) createSeries() error {
if len(a.newSeries) == 0 {
return nil
}
a.createdSeries = make([]RefSeries, 0, len(a.newSeries))
base0 := len(a.head.series)
a.head.mtx.RUnlock()
defer a.head.mtx.RLock()
a.head.mtx.Lock()
defer a.head.mtx.Unlock()
base1 := len(a.head.series)
for _, l := range a.newSeries {
// We switched locks and have to re-validate that the series were not
// created by another goroutine in the meantime.
if base1 > base0 {
if ms := a.head.get(l.hash, l.labels); ms != nil {
l.ref = uint64(ms.ref)
continue
}
}
// Series is still new.
s := a.head.create(l.hash, l.labels)
l.ref = uint64(s.ref)
a.createdSeries = append(a.createdSeries, RefSeries{Ref: l.ref, Labels: l.labels})
}
// Write all new series to the WAL.
if err := a.head.wal.LogSeries(a.createdSeries); err != nil {
return errors.Wrap(err, "WAL log series")
}
return nil
}
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
defer a.head.mtx.RUnlock() defer a.Rollback()
defer a.head.metrics.activeAppenders.Dec() if err := a.head.wal.LogSeries(a.series); err != nil {
defer a.head.putAppendBuffer(a.samples)
if err := a.createSeries(); err != nil {
return err return err
} }
// We have to update the refs of samples for series we just created.
for i := range a.samples {
s := &a.samples[i]
if s.Ref&(1<<63) != 0 {
s.Ref = a.newSeries[(s.Ref<<1)>>1].ref
}
}
// Write all new samples to the WAL and add them to the
// in-mem database on success.
if err := a.head.wal.LogSamples(a.samples); err != nil { if err := a.head.wal.LogSamples(a.samples); err != nil {
return errors.Wrap(err, "WAL log samples") return errors.Wrap(err, "WAL log samples")
} }
total := uint64(len(a.samples)) total := len(a.samples)
for _, s := range a.samples { for _, s := range a.samples {
series, ok := a.head.series[s.Ref] ok, chunkCreated := s.series.append(s.T, s.V)
if !ok {
return errors.Errorf("series with ID %d not found", s.Ref)
}
ok, chunkCreated := series.append(s.T, s.V)
if !ok { if !ok {
total-- total--
} }
@ -557,8 +445,6 @@ func (a *headAppender) Commit() error {
} }
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() error {
a.head.mtx.RUnlock()
a.head.metrics.activeAppenders.Dec() a.head.metrics.activeAppenders.Dec()
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
@ -580,7 +466,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
Outer: Outer:
for p.Next() { for p.Next() {
series := h.series[p.At()] series := h.series.getByID(p.At())
for _, abs := range absent { for _, abs := range absent {
if series.lset.Get(abs) != "" { if series.lset.Get(abs) != "" {
@ -607,111 +493,98 @@ Outer:
// gc removes data before the minimum timestmap from the head. // gc removes data before the minimum timestmap from the head.
func (h *Head) gc() { func (h *Head) gc() {
var ( defer runtime.GC()
seriesRemoved int
chunksRemoved int
)
// Only data strictly lower than this timestamp must be deleted. // Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime() mint := h.MinTime()
deletedHashes := map[uint64][]uint64{} // Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
h.mtx.RLock() deleted, chunksRemoved := h.series.gc(mint)
seriesRemoved := len(deleted)
for hash, ss := range h.hashes {
for _, s := range ss {
s.mtx.Lock()
chunksRemoved += s.truncateChunksBefore(mint)
if len(s.chunks) == 0 {
deletedHashes[hash] = append(deletedHashes[hash], s.ref)
}
s.mtx.Unlock()
}
}
deletedIDs := make(map[uint64]struct{}, len(deletedHashes))
h.mtx.RUnlock()
h.mtx.Lock()
defer h.mtx.Unlock()
for hash, ids := range deletedHashes {
inIDs := func(id uint64) bool {
for _, o := range ids {
if o == id {
return true
}
}
return false
}
var rem []*memSeries
for _, s := range h.hashes[hash] {
if !inIDs(s.ref) {
rem = append(rem, s)
continue
}
deletedIDs[s.ref] = struct{}{}
// We switched locks and the series might have received new samples by now,
// check again.
s.mtx.Lock()
chkCount := len(s.chunks)
s.mtx.Unlock()
if chkCount > 0 {
continue
}
delete(h.series, s.ref)
seriesRemoved++
}
if len(rem) > 0 {
h.hashes[hash] = rem
} else {
delete(h.hashes, hash)
}
}
for t, p := range h.postings.m {
repl := make([]uint64, 0, len(p))
for _, id := range p {
if _, ok := deletedIDs[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) == 0 {
delete(h.postings.m, t)
} else {
h.postings.m[t] = repl
}
}
symbols := make(map[string]struct{}, len(h.symbols))
values := make(map[string]stringset, len(h.values))
for t := range h.postings.m {
symbols[t.name] = struct{}{}
symbols[t.value] = struct{}{}
ss, ok := values[t.name]
if !ok {
ss = stringset{}
values[t.name] = ss
}
ss.set(t.value)
}
h.symbols = symbols
h.values = values
h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.series.Sub(float64(seriesRemoved)) h.metrics.series.Sub(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved))
// Remove deleted series IDs from the postings lists. First do a collection
// run where we rebuild all postings that have something to delete
h.postings.mtx.RLock()
type replEntry struct {
idx int
l []uint64
}
collected := map[labels.Label]replEntry{}
for t, p := range h.postings.m {
repl := replEntry{idx: len(p)}
for i, id := range p {
if _, ok := deleted[id]; ok {
// First ID that got deleted, initialize replacement with
// all remaining IDs so far.
if repl.l == nil {
repl.l = make([]uint64, 0, len(p))
repl.l = append(repl.l, p[:i]...)
}
continue
}
// Only add to the replacement once we know we have to do it.
if repl.l != nil {
repl.l = append(repl.l, id)
}
}
if repl.l != nil {
collected[t] = repl
}
}
h.postings.mtx.RUnlock()
// Replace all postings that have changed. Append all IDs that may have
// been added while we switched locks.
h.postings.mtx.Lock()
for t, repl := range collected {
l := append(repl.l, h.postings.m[t][repl.idx:]...)
if len(l) > 0 {
h.postings.m[t] = l
} else {
delete(h.postings.m, t)
}
}
h.postings.mtx.Unlock()
// Rebuild symbols and label value indices from what is left in the postings terms.
h.postings.mtx.RLock()
symbols := make(map[string]struct{}, len(h.symbols))
values := make(map[string]stringset, len(h.values))
for t := range h.postings.m {
symbols[t.Name] = struct{}{}
symbols[t.Value] = struct{}{}
ss, ok := values[t.Name]
if !ok {
ss = stringset{}
values[t.Name] = ss
}
ss.set(t.Value)
}
h.postings.mtx.RUnlock()
h.symMtx.Lock()
h.symbols = symbols
h.values = values
h.symMtx.Unlock()
} }
func (h *Head) Tombstones() TombstoneReader { func (h *Head) Tombstones() TombstoneReader {
@ -779,11 +652,9 @@ func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
h.head.mtx.RLock()
defer h.head.mtx.RUnlock()
sid, cid := unpackChunkID(ref) sid, cid := unpackChunkID(ref)
s := h.head.series[sid]
s := h.head.series.getByID(sid)
s.mtx.RLock() s.mtx.RLock()
c := s.chunk(int(cid)) c := s.chunk(int(cid))
@ -843,19 +714,27 @@ func (h *headIndexReader) Close() error {
} }
func (h *headIndexReader) Symbols() (map[string]struct{}, error) { func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
return h.head.symbols, nil h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
res := make(map[string]struct{}, len(h.head.symbols))
for s := range h.head.symbols {
res[s] = struct{}{}
}
return res, nil
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) {
h.head.mtx.RLock()
defer h.head.mtx.RUnlock()
if len(names) != 1 { if len(names) != 1 {
return nil, errInvalidSize return nil, errInvalidSize
} }
var sl []string var sl []string
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
for s := range h.head.values[names[0]] { for s := range h.head.values[names[0]] {
sl = append(sl, s) sl = append(sl, s)
} }
@ -866,17 +745,11 @@ func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) {
// Postings returns the postings list iterator for the label pair. // Postings returns the postings list iterator for the label pair.
func (h *headIndexReader) Postings(name, value string) (Postings, error) { func (h *headIndexReader) Postings(name, value string) (Postings, error) {
h.head.mtx.RLock() return h.head.postings.get(name, value), nil
defer h.head.mtx.RUnlock()
return h.head.postings.get(term{name: name, value: value}), nil
} }
func (h *headIndexReader) SortedPostings(p Postings) Postings { func (h *headIndexReader) SortedPostings(p Postings) Postings {
h.head.mtx.RLock() ep := make([]uint64, 0, 128)
defer h.head.mtx.RUnlock()
ep := make([]uint64, 0, 1024)
for p.Next() { for p.Next() {
ep = append(ep, p.At()) ep = append(ep, p.At())
@ -890,10 +763,10 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
if err != nil { if err != nil {
return false return false
} }
a, ok1 := h.head.series[ep[i]] a := h.head.series.getByID(ep[i])
b, ok2 := h.head.series[ep[j]] b := h.head.series.getByID(ep[j])
if !ok1 || !ok2 { if a == nil || b == nil {
err = errors.Errorf("series not found") err = errors.Errorf("series not found")
return false return false
} }
@ -907,10 +780,8 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
h.head.mtx.RLock() s := h.head.series.getByID(ref)
defer h.head.mtx.RUnlock()
s := h.head.series[ref]
if s == nil { if s == nil {
return ErrNotFound return ErrNotFound
} }
@ -937,8 +808,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
} }
func (h *headIndexReader) LabelIndices() ([][]string, error) { func (h *headIndexReader) LabelIndices() ([][]string, error) {
h.head.mtx.RLock() h.head.symMtx.RLock()
defer h.head.mtx.RUnlock() defer h.head.symMtx.RUnlock()
res := [][]string{} res := [][]string{}
@ -948,29 +819,24 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *Head) get(hash uint64, lset labels.Labels) *memSeries {
series := h.hashes[hash]
for _, s := range series {
if s.lset.Equals(lset) {
return s
}
}
return nil
}
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
h.metrics.series.Inc() h.metrics.series.Inc()
h.metrics.seriesCreated.Inc() h.metrics.seriesCreated.Inc()
// Optimistically assume that we are the first one to create the series.
id := atomic.AddUint64(&h.lastSeriesID, 1) id := atomic.AddUint64(&h.lastSeriesID, 1)
s := newMemSeries(lset, id, h.chunkRange) s := newMemSeries(lset, id, h.chunkRange)
h.series[id] = s
h.hashes[hash] = append(h.hashes[hash], s) s, created := h.series.getOrSet(hash, s)
// Skip indexing if we didn't actually create the series.
if !created {
return s
}
h.postings.add(id, lset)
h.symMtx.Lock()
defer h.symMtx.Unlock()
for _, l := range lset { for _, l := range lset {
valset, ok := h.values[l.Name] valset, ok := h.values[l.Name]
@ -980,17 +846,179 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
} }
valset.set(l.Value) valset.set(l.Value)
h.postings.add(s.ref, term{name: l.Name, value: l.Value})
h.symbols[l.Name] = struct{}{} h.symbols[l.Name] = struct{}{}
h.symbols[l.Value] = struct{}{} h.symbols[l.Value] = struct{}{}
} }
h.postings.add(id, term{})
return s return s
} }
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
// Its methods require the hash to be submitted with it to avoid re-computations throughout
// the code.
type seriesHashmap map[uint64][]*memSeries
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
for _, s := range m[hash] {
if s.lset.Equals(lset) {
return s
}
}
return nil
}
func (m seriesHashmap) set(hash uint64, s *memSeries) {
l := m[hash]
for i, prev := range l {
if prev.lset.Equals(s.lset) {
l[i] = s
return
}
}
m[hash] = append(l, s)
}
func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
var rem []*memSeries
for _, s := range m[hash] {
if !s.lset.Equals(lset) {
rem = append(rem, s)
}
}
if len(rem) == 0 {
delete(m, hash)
} else {
m[hash] = rem
}
}
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
// The locks are padded to not be on the same cache line. Filling the badded space
// with the maps was profiled to be slower likely due to the additional pointer
// dereferences.
type stripeSeries struct {
series [stripeSize]map[uint64]*memSeries
hashes [stripeSize]seriesHashmap
locks [stripeSize]stripeLock
}
const (
stripeSize = 1 << 14
stripeMask = stripeSize - 1
)
type stripeLock struct {
sync.RWMutex
// Padding to avoid multiple locks being on the same cache line.
_ [40]byte
}
func newStripeSeries() *stripeSeries {
s := &stripeSeries{}
for i := range s.series {
s.series[i] = map[uint64]*memSeries{}
}
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
}
return s
}
// gc garbage collects old chunks that are strictly before mint and removes
// series entirely that have no chunks left.
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
var (
deleted = map[uint64]struct{}{}
rmChunks = 0
)
// Run through all series and truncate old chunks. Mark those with no
// chunks left as deleted and store their ID and hash.
for i := 0; i < stripeSize; i++ {
s.locks[i].Lock()
for hash, all := range s.hashes[i] {
for _, series := range all {
series.mtx.Lock()
rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 {
series.mtx.Unlock()
continue
}
// The series is gone entirely. We need to keep the series lock
// and make sure we have acquired the stripe locks for hash and ID of the
// series alike.
// If we don't hold them all, there's a very small chance that a series receives
// samples again while we are half-way into deleting it.
j := int(series.ref & stripeMask)
if i != j {
s.locks[j].Lock()
}
deleted[series.ref] = struct{}{}
s.hashes[i].del(hash, series.lset)
delete(s.series[j], series.ref)
if i != j {
s.locks[j].Unlock()
}
series.mtx.Unlock()
}
}
s.locks[i].Unlock()
}
return deleted, rmChunks
}
func (s *stripeSeries) getByID(id uint64) *memSeries {
i := id & stripeMask
s.locks[i].RLock()
series := s.series[i][id]
s.locks[i].RUnlock()
return series
}
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
i := hash & stripeMask
s.locks[i].RLock()
series := s.hashes[i].get(hash, lset)
s.locks[i].RUnlock()
return series
}
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
i := hash & stripeMask
s.locks[i].Lock()
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
return prev, false
}
s.hashes[i].set(hash, series)
s.hashes[i][hash] = append(s.hashes[i][hash], series)
s.locks[i].Unlock()
i = series.ref & stripeMask
s.locks[i].Lock()
s.series[i][series.ref] = series
s.locks[i].Unlock()
return series, true
}
type sample struct { type sample struct {
t int64 t int64
v float64 v float64

View file

@ -117,22 +117,22 @@ func TestHead_Truncate(t *testing.T) {
require.Equal(t, []*memChunk{ require.Equal(t, []*memChunk{
{minTime: 2000, maxTime: 2999}, {minTime: 2000, maxTime: 2999},
}, h.series[s1.ref].chunks) }, h.series.getByID(s1.ref).chunks)
require.Equal(t, []*memChunk{ require.Equal(t, []*memChunk{
{minTime: 2000, maxTime: 2999}, {minTime: 2000, maxTime: 2999},
{minTime: 3000, maxTime: 3999}, {minTime: 3000, maxTime: 3999},
}, h.series[s2.ref].chunks) }, h.series.getByID(s2.ref).chunks)
require.Nil(t, h.series[s3.ref]) require.Nil(t, h.series.getByID(s3.ref))
require.Nil(t, h.series[s4.ref]) require.Nil(t, h.series.getByID(s4.ref))
postingsA1, _ := expandPostings(h.postings.get(term{"a", "1"})) postingsA1, _ := expandPostings(h.postings.get("a", "1"))
postingsA2, _ := expandPostings(h.postings.get(term{"a", "2"})) postingsA2, _ := expandPostings(h.postings.get("a", "2"))
postingsB1, _ := expandPostings(h.postings.get(term{"b", "1"})) postingsB1, _ := expandPostings(h.postings.get("b", "1"))
postingsB2, _ := expandPostings(h.postings.get(term{"b", "2"})) postingsB2, _ := expandPostings(h.postings.get("b", "2"))
postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"})) postingsC1, _ := expandPostings(h.postings.get("c", "1"))
postingsAll, _ := expandPostings(h.postings.get(term{"", ""})) postingsAll, _ := expandPostings(h.postings.get("", ""))
require.Equal(t, []uint64{s1.ref}, postingsA1) require.Equal(t, []uint64{s1.ref}, postingsA1)
require.Equal(t, []uint64{s2.ref}, postingsA2) require.Equal(t, []uint64{s2.ref}, postingsA2)
@ -517,7 +517,7 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
full = full[1:] full = full[1:]
} }
for i, s := range full { for i, s := range full {
// Terminate on the first sample larger than maxt. // labels.Labelinate on the first sample larger than maxt.
if s.t > maxt { if s.t > maxt {
return full[:i] return full[:i]
} }

View file

@ -43,7 +43,7 @@ func newMockIndex() mockIndex {
return mockIndex{ return mockIndex{
series: make(map[uint64]series), series: make(map[uint64]series),
labelIndex: make(map[string][]string), labelIndex: make(map[string][]string),
postings: &memPostings{m: make(map[term][]uint64)}, postings: newMemPostings(),
symbols: make(map[string]struct{}), symbols: make(map[string]struct{}),
} }
} }
@ -84,14 +84,14 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
} }
func (m mockIndex) WritePostings(name, value string, it Postings) error { func (m mockIndex) WritePostings(name, value string, it Postings) error {
if _, ok := m.postings.m[term{name, value}]; ok { if _, ok := m.postings.m[labels.Label{name, value}]; ok {
return errors.Errorf("postings for %s=%q already added", name, value) return errors.Errorf("postings for %s=%q already added", name, value)
} }
ep, err := expandPostings(it) ep, err := expandPostings(it)
if err != nil { if err != nil {
return err return err
} }
m.postings.m[term{name, value}] = ep m.postings.m[labels.Label{name, value}] = ep
return it.Err() return it.Err()
} }
@ -110,7 +110,7 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
} }
func (m mockIndex) Postings(name, value string) (Postings, error) { func (m mockIndex) Postings(name, value string) (Postings, error) {
return m.postings.get(term{name, value}), nil return m.postings.get(name, value), nil
} }
func (m mockIndex) SortedPostings(p Postings) Postings { func (m mockIndex) SortedPostings(p Postings) Postings {
@ -274,7 +274,7 @@ func TestPersistence_index_e2e(t *testing.T) {
// Population procedure as done by compaction. // Population procedure as done by compaction.
var ( var (
postings = &memPostings{m: make(map[term][]uint64, 512)} postings = newMemPostings()
values = map[string]stringset{} values = map[string]stringset{}
) )
@ -292,9 +292,8 @@ func TestPersistence_index_e2e(t *testing.T) {
values[l.Name] = valset values[l.Name] = valset
} }
valset.set(l.Value) valset.set(l.Value)
postings.add(uint64(i), term{name: l.Name, value: l.Value})
} }
postings.add(uint64(i), s.labels)
i++ i++
} }
@ -313,10 +312,10 @@ func TestPersistence_index_e2e(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
mi.WritePostings("", "", newListPostings(all)) mi.WritePostings("", "", newListPostings(all))
for tm := range postings.m { for l := range postings.m {
err = iw.WritePostings(tm.name, tm.value, postings.get(tm)) err = iw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value))
require.NoError(t, err) require.NoError(t, err)
mi.WritePostings(tm.name, tm.value, postings.get(tm)) mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value))
} }
err = iw.Close() err = iw.Close()
@ -326,10 +325,10 @@ func TestPersistence_index_e2e(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for p := range mi.postings.m { for p := range mi.postings.m {
gotp, err := ir.Postings(p.name, p.value) gotp, err := ir.Postings(p.Name, p.Value)
require.NoError(t, err) require.NoError(t, err)
expp, err := mi.Postings(p.name, p.value) expp, err := mi.Postings(p.Name, p.Value)
var lset, explset labels.Labels var lset, explset labels.Labels
var chks, expchks []ChunkMeta var chks, expchks []ChunkMeta

View file

@ -218,3 +218,25 @@ func BenchmarkLabelSetEquals(b *testing.B) {
} }
_ = res _ = res
} }
func BenchmarkLabelSetHash(b *testing.B) {
// The vast majority of comparisons will be against a matching label set.
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
ls := FromMap(m)
var res uint64
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
res += ls.Hash()
}
fmt.Println(res)
}

View file

@ -17,31 +17,47 @@ import (
"encoding/binary" "encoding/binary"
"sort" "sort"
"strings" "strings"
"sync"
"github.com/prometheus/tsdb/labels"
) )
type memPostings struct { type memPostings struct {
m map[term][]uint64 mtx sync.RWMutex
m map[labels.Label][]uint64
} }
type term struct { func newMemPostings() *memPostings {
name, value string return &memPostings{
m: make(map[labels.Label][]uint64, 512),
}
} }
// Postings returns an iterator over the postings list for s. // Postings returns an iterator over the postings list for s.
func (p *memPostings) get(t term) Postings { func (p *memPostings) get(name, value string) Postings {
l := p.m[t] p.mtx.RLock()
l := p.m[labels.Label{Name: name, Value: value}]
p.mtx.RUnlock()
if l == nil { if l == nil {
return emptyPostings return emptyPostings
} }
return newListPostings(l) return newListPostings(l)
} }
var allLabel = labels.Label{}
// add adds a document to the index. The caller has to ensure that no // add adds a document to the index. The caller has to ensure that no
// term argument appears twice. // term argument appears twice.
func (p *memPostings) add(id uint64, terms ...term) { func (p *memPostings) add(id uint64, lset labels.Labels) {
for _, t := range terms { p.mtx.Lock()
p.m[t] = append(p.m[t], id)
for _, l := range lset {
p.m[l] = append(p.m[l], id)
} }
p.m[allLabel] = append(p.m[allLabel], id)
p.mtx.Unlock()
} }
// Postings provides iterative access over a postings list. // Postings provides iterative access over a postings list.

View file

@ -228,7 +228,7 @@ func createIdxChkReaders(tc []struct {
return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0
}) })
postings := &memPostings{m: make(map[term][]uint64, 512)} postings := newMemPostings()
chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) chkReader := mockChunkReader(make(map[uint64]chunks.Chunk))
lblIdx := make(map[string]stringset) lblIdx := make(map[string]stringset)
mi := newMockIndex() mi := newMockIndex()
@ -257,10 +257,9 @@ func createIdxChkReaders(tc []struct {
ls := labels.FromMap(s.lset) ls := labels.FromMap(s.lset)
mi.AddSeries(uint64(i), ls, metas...) mi.AddSeries(uint64(i), ls, metas...)
postings.add(uint64(i), term{}) postings.add(uint64(i), ls)
for _, l := range ls {
postings.add(uint64(i), term{l.Name, l.Value})
for _, l := range ls {
vs, present := lblIdx[l.Name] vs, present := lblIdx[l.Name]
if !present { if !present {
vs = stringset{} vs = stringset{}
@ -274,8 +273,8 @@ func createIdxChkReaders(tc []struct {
mi.WriteLabelIndex([]string{l}, vs.slice()) mi.WriteLabelIndex([]string{l}, vs.slice())
} }
for tm := range postings.m { for l := range postings.m {
mi.WritePostings(tm.name, tm.value, postings.get(tm)) mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value))
} }
return mi, chkReader return mi, chkReader

View file

@ -60,7 +60,6 @@ func BenchmarkHash(b *testing.B) {
} }
}) })
} }
} }
// hashAdd adds a string to a fnv64a hash value, returning the updated hash. // hashAdd adds a string to a fnv64a hash value, returning the updated hash.

5
wal.go
View file

@ -111,6 +111,9 @@ type WALReader interface {
type RefSeries struct { type RefSeries struct {
Ref uint64 Ref uint64
Labels labels.Labels Labels labels.Labels
// hash for the label set. This field is not generally populated.
hash uint64
} }
// RefSample is a timestamp/value pair associated with a reference to a series. // RefSample is a timestamp/value pair associated with a reference to a series.
@ -118,6 +121,8 @@ type RefSample struct {
Ref uint64 Ref uint64
T int64 T int64
V float64 V float64
series *memSeries
} }
type segmentFile struct { type segmentFile struct {