From 012cf4ef254e34a10befd0b592bcfa5b1794e92b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 4 Feb 2017 11:53:52 +0100 Subject: [PATCH] Count writer references on head blocks --- cmd/tsdb/main.go | 2 +- db.go | 9 ++++++++- head.go | 9 ++++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 1cb9efaf2d..4a838664b1 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -122,7 +122,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 4000) + total, err = b.ingestScrapes(metrics, 3000) if err != nil { exitWithError(err) } diff --git a/db.go b/db.go index 93c9e1420f..4b69601e7a 100644 --- a/db.go +++ b/db.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unsafe" @@ -29,7 +30,7 @@ import ( var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds + MaxBlockDuration: 48 * 60 * 60 * 1000, // 2 days in milliseconds AppendableBlocks: 2, } @@ -503,6 +504,12 @@ func (db *DB) compactable() []Block { } for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + // Blocks that won't be appendable when instantiating a new appender + // might still have active appenders on them. + // Abort at the first one we encounter. + if atomic.LoadUint64(&h.activeWriters) > 0 { + break + } blocks = append(blocks, h) } return blocks diff --git a/head.go b/head.go index 3e0d0be471..1c69ac3bb4 100644 --- a/head.go +++ b/head.go @@ -7,6 +7,7 @@ import ( "os" "sort" "sync" + "sync/atomic" "time" "github.com/bradfitz/slice" @@ -40,6 +41,8 @@ type headBlock struct { generation uint8 wal *WAL + activeWriters uint64 + // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -147,6 +150,8 @@ func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } func (h *headBlock) Appender() Appender { + atomic.AddUint64(&h.activeWriters, 1) + h.mtx.RLock() return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } @@ -295,6 +300,7 @@ func (a *headAppender) createSeries() { } func (a *headAppender) Commit() error { + defer atomic.AddUint64(&a.activeWriters, ^uint64(0)) defer putHeadAppendBuffer(a.samples) a.createSeries() @@ -345,8 +351,9 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { - putHeadAppendBuffer(a.samples) a.mtx.RUnlock() + atomic.AddUint64(&a.activeWriters, ^uint64(0)) + putHeadAppendBuffer(a.samples) return nil }