diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 491c2ac35..960c9c05e 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 2000) + total, err = b.ingestScrapes(metrics, 15000) if err != nil { exitWithError(err) } @@ -147,6 +147,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) + select {} measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { diff --git a/head.go b/head.go index 4e1e85877..b2d6e9842 100644 --- a/head.go +++ b/head.go @@ -403,11 +403,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - if err := s.appendable(t, v); err != nil { - return err - } + err := s.appendable(t, v) s.Unlock() + if err != nil { + return err + } if t < a.mint { return ErrOutOfBounds } diff --git a/wal.go b/wal.go index 747510fd6..9af9a1853 100644 --- a/wal.go +++ b/wal.go @@ -417,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -447,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -479,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -537,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -561,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) diff --git a/wal_test.go b/wal_test.go index 45279b1b5..180623dd0 100644 --- a/wal_test.go +++ b/wal_test.go @@ -91,9 +91,8 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.cut(), "cut failed") - // Cutting creates a new file and close the previous tail file. + // Cutting creates a new file. require.Equal(t, 2, len(w.files)) - require.Error(t, w.files[0].Close()) require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))