From 300f4e2abff98372c1e9d4bae4670f74fde6cc26 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Jan 2017 17:23:12 +0100 Subject: [PATCH] Use separate lock for series creation This uses the head block's own lock to only lock if new series were encountered. In the general append case we just need to hold a --- cmd/tsdb/main.go | 2 +- db.go | 7 +++-- head.go | 71 ++++++++++++++++++++++++++++++++++++++++++------ wal.go | 11 +++++++- 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 884f8011af..034db8a32f 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -91,7 +91,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dir := filepath.Join(b.outPath, "storage") - st, err := tsdb.OpenPartitioned(dir, 8, nil, nil) + st, err := tsdb.OpenPartitioned(dir, 1, nil, nil) if err != nil { exitWithError(err) } diff --git a/db.go b/db.go index a2d440780f..28277fcd88 100644 --- a/db.go +++ b/db.go @@ -339,8 +339,8 @@ func (db *DB) appendBatch(samples []hashedSample) error { if len(samples) == 0 { return nil } - db.mtx.Lock() - defer db.mtx.Unlock() + db.mtx.RLock() + defer db.mtx.RUnlock() head := db.heads[len(db.heads)-1] @@ -426,6 +426,9 @@ func (db *DB) reinit(dir string) error { } func (db *DB) compactable() []block { + db.mtx.RLock() + defer db.mtx.RUnlock() + var blocks []block for _, pb := range db.persisted { blocks = append([]block{pb}, blocks...) diff --git a/head.go b/head.go index 0581bb34bd..e58f629b78 100644 --- a/head.go +++ b/head.go @@ -5,6 +5,7 @@ import ( "math" "sort" "sync" + "sync/atomic" "time" "github.com/bradfitz/slice" @@ -100,6 +101,9 @@ func (h *HeadBlock) stats() BlockStats { return h.bstats } // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + if int(ref) >= len(h.descs) { return nil, errNotFound } @@ -107,16 +111,25 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { } func (h *HeadBlock) interval() (int64, int64) { + h.mtx.RLock() + defer h.mtx.RUnlock() + return h.bstats.MinTime, h.bstats.MaxTime } // Stats returns statisitics about the indexed data. func (h *HeadBlock) Stats() (BlockStats, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + return h.bstats, nil } // LabelValues returns the possible label values func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + if len(names) != 1 { return nil, errInvalidSize } @@ -132,11 +145,17 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { // Postings returns the postings list iterator for the label pair. func (h *HeadBlock) Postings(name, value string) (Postings, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + return h.postings.get(term{name: name, value: value}), nil } // Series returns the series for the given reference. func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + if int(ref) >= len(h.descs) { return nil, nil, errNotFound } @@ -151,6 +170,9 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { } func (h *HeadBlock) LabelIndices() ([][]string, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + res := [][]string{} for s := range h.values { @@ -226,9 +248,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { // ones we haven't seen before. var ( newSeries []labels.Labels + newSamples []*hashedSample newHashes []uint64 uniqueHashes = map[uint64]uint32{} ) + h.mtx.RLock() for i := range samples { s := &samples[i] @@ -254,13 +278,16 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { s.ref = ref continue } - s.ref = uint32(len(h.descs) + len(newSeries)) + s.ref = uint32(len(newSeries)) uniqueHashes[s.hash] = s.ref newSeries = append(newSeries, s.labels) newHashes = append(newHashes, s.hash) + newSamples = append(newSamples, s) } + h.mtx.RUnlock() + // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := h.wal.Log(newSeries, samples); err != nil { @@ -269,27 +296,55 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { // After the samples were successfully written to the WAL, there may // be no further failures. - for i, s := range newSeries { - h.create(newHashes[i], s) + if len(newSeries) > 0 { + h.mtx.Lock() + + base := len(h.descs) + + for i, s := range newSeries { + h.create(newHashes[i], s) + } + for _, s := range newSamples { + s.ref = uint32(base) + s.ref + } + + h.mtx.Unlock() + + h.mtx.RLock() + defer h.mtx.RUnlock() } + total := len(samples) + for _, s := range samples { cd := h.descs[s.ref] // Skip duplicate samples. if cd.lastTimestamp == s.t && cd.lastValue != s.v { + total-- continue } cd.append(s.t, s.v) - if s.t > h.bstats.MaxTime { - h.bstats.MaxTime = s.t + if t := h.bstats.MaxTime; s.t > t { + // h.bstats.MaxTime = s.t + for !atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) { + if t = h.bstats.MaxTime; s.t <= t { + break + } + } } - if s.t < h.bstats.MinTime { - h.bstats.MinTime = s.t + if t := h.bstats.MinTime; s.t < t { + // h.bstats.MinTime = s.t + for !atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) { + if t = h.bstats.MinTime; s.t >= t { + break + } + } } - h.bstats.SampleCount++ } + atomic.AddUint64(&h.bstats.SampleCount, uint64(total)) + return nil } diff --git a/wal.go b/wal.go index 0132f05e5c..c64f130a29 100644 --- a/wal.go +++ b/wal.go @@ -7,6 +7,7 @@ import ( "math" "os" "path/filepath" + "sync" "time" "github.com/coreos/etcd/pkg/fileutil" @@ -29,6 +30,8 @@ const ( // WAL is a write ahead log for series data. It can only be written to. // Use WALReader to read back from a write ahead log. type WAL struct { + mtx sync.Mutex + f *fileutil.LockedFile enc *walEncoder logger log.Logger @@ -108,6 +111,9 @@ func (w *WAL) ReadAll(h *walHandler) error { // Log writes a batch of new series labels and samples to the log. func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { + w.mtx.Lock() + defer w.mtx.Unlock() + if err := w.enc.encodeSeries(series); err != nil { return err } @@ -142,9 +148,12 @@ func (w *WAL) run(interval time.Duration) { case <-w.stopc: return case <-tick: + w.mtx.Lock() + if err := w.sync(); err != nil { w.logger.Log("msg", "sync failed", "err", err) } + w.mtx.Unlock() } } } @@ -172,7 +181,7 @@ const ( // walPageBytes is the alignment for flushing records to the backing Writer. // It should be a multiple of the minimum sector size so that WAL can safely // distinguish between torn writes and ordinary data corruption. - walPageBytes = 8 * minSectorSize + walPageBytes = 32 * minSectorSize ) func newWALEncoder(f *os.File) (*walEncoder, error) {