diff --git a/db.go b/db.go index 93e2dfd1a..e6ec3bca0 100644 --- a/db.go +++ b/db.go @@ -466,18 +466,19 @@ func (db *DB) Appender() Appender { // anyway. For now this, with combination of only having a single timestamp per batch, // prevents opening more than one appender and hitting an unresolved deadlock (#11). // - // // Only instantiate appender after returning the headmtx to avoid - // // questionable locking order. - // db.headmtx.RLock() - // app := db.appendable() - // db.headmtx.RUnlock() - // for _, b := range app { - // a.heads = append(a.heads, &metaAppender{ - // meta: b.Meta(), - // app: b.Appender().(*headAppender), - // }) - // } + // Only instantiate appender after returning the headmtx to avoid + // questionable locking order. + db.headmtx.RLock() + app := db.appendable() + db.headmtx.RUnlock() + + for _, b := range app { + a.heads = append(a.heads, &metaAppender{ + meta: b.Meta(), + app: b.Appender(), + }) + } return a } @@ -556,26 +557,26 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { a.db.headmtx.Unlock() // XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender. - for _, b := range newHeads { - // Only get appender for the block with the specific timestamp. - if t >= b.Meta().MaxTime { - continue - } - a.heads = append(a.heads, &metaAppender{ - app: b.Appender(), - meta: b.Meta(), - }) - break - } - - // // Instantiate appenders after returning headmtx to avoid questionable - // // locking order. // for _, b := range newHeads { + // // Only get appender for the block with the specific timestamp. + // if t >= b.Meta().MaxTime { + // continue + // } // a.heads = append(a.heads, &metaAppender{ // app: b.Appender(), // meta: b.Meta(), // }) + // break // } + + // Instantiate appenders after returning headmtx to avoid questionable + // locking order. + for _, b := range newHeads { + a.heads = append(a.heads, &metaAppender{ + app: b.Appender(), + meta: b.Meta(), + }) + } } for i := len(a.heads) - 1; i >= 0; i-- { if h := a.heads[i]; t >= h.meta.MinTime { @@ -613,28 +614,37 @@ func (db *DB) ensureHead(t int64) error { } func (a *dbAppender) Commit() error { - var merr MultiError + defer a.db.mtx.RUnlock() + + // Commits to partial appenders must be concurrent as concurrent appenders + // may have conflicting locks on head appenders. + // XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error? + var g errgroup.Group for _, h := range a.heads { - merr.Add(h.app.Commit()) + g.Go(h.app.Commit) } - a.db.mtx.RUnlock() - if merr.Err() == nil { - a.db.metrics.samplesAppended.Add(float64(a.samples)) + if err := g.Wait(); err != nil { + return err } - return merr.Err() + // XXX(fabxc): Push the metric down into head block to account properly + // for partial appends? + a.db.metrics.samplesAppended.Add(float64(a.samples)) + + return nil } func (a *dbAppender) Rollback() error { - var merr MultiError + defer a.db.mtx.RUnlock() + + var g errgroup.Group for _, h := range a.heads { - merr.Add(h.app.Rollback()) + g.Go(h.app.Commit) } - a.db.mtx.RUnlock() - return merr.Err() + return g.Wait() } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. diff --git a/querier.go b/querier.go index 3ee90dd6f..d4e71e821 100644 --- a/querier.go +++ b/querier.go @@ -258,6 +258,9 @@ func (nopSeriesSet) Next() bool { return false } func (nopSeriesSet) At() Series { return nil } func (nopSeriesSet) Err() error { return nil } +// mergedSeriesSet takes two series sets as a single series set. The input series sets +// must be sorted and sequential in time, i.e. if they have the same label set, +// the datapoints of a must be before the datapoints of b. type mergedSeriesSet struct { a, b SeriesSet @@ -307,11 +310,9 @@ func (s *mergedSeriesSet) Next() bool { if d > 0 { s.cur = s.b.At() s.bdone = !s.b.Next() - } else if d < 0 { s.cur = s.a.At() s.adone = !s.a.Next() - } else { s.cur = &chainedSeries{series: []Series{s.a.At(), s.b.At()}} s.adone = !s.a.Next()