diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 7d8174f0d..a3bc7d17a 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -17,7 +17,6 @@ import ( "math/rand" "os" "path/filepath" - "runtime" "sort" "time" @@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) - - // We might have done quite a few allocs. Enforce a GC so they do not accumulate - // with subsequent compactions or head GCs. - runtime.GC() }(time.Now()) dir := filepath.Join(dest, meta.ULID.String()) @@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - // Write a postings list containing all series. - all := make([]uint64, i) - for i := range all { - all[i] = uint64(i) - } - if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return errors.Wrap(err, "write 'all' postings") - } return nil } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index ad034d8b0..c9745cfc6 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sort" "strconv" "sync" @@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) { } changes = true + runtime.GC() + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } // Check for compactions of multiple blocks. @@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrap(err, "delete compacted block") } } + runtime.GC() if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } return changes, nil diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go index 9aa4ba409..17c3ff081 100644 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// uvarintTempStr decodes like uvarintStr but the returned string is +// not safe to use if the underyling buffer changes. +func (d *decbuf) uvarintTempStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index a74552bca..ea7b63f8a 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "runtime" "sort" "sync" "sync/atomic" @@ -402,10 +401,13 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } - if err := s.appendable(t, v); err != nil { + s.Lock() + err := s.appendable(t, v) + s.Unlock() + + if err != nil { return err } - if t < a.mint { return ErrOutOfBounds } @@ -435,7 +437,10 @@ func (a *headAppender) Commit() error { total := len(a.samples) for _, s := range a.samples { + s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.Unlock() + if !ok { total-- } @@ -509,8 +514,6 @@ Outer: // gc removes data before the minimum timestmap from the head. func (h *Head) gc() { - defer runtime.GC() - // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() @@ -672,9 +675,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s := h.head.series.getByID(sid) - s.mtx.RLock() + s.Lock() c := s.chunk(int(cid)) - s.mtx.RUnlock() + s.Unlock() // Do not expose chunks that are outside of the specified range. if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { @@ -694,9 +697,10 @@ type safeChunk struct { } func (c *safeChunk) Iterator() chunks.Iterator { - c.s.mtx.RLock() - defer c.s.mtx.RUnlock() - return c.s.iterator(c.cid) + c.s.Lock() + it := c.s.iterator(c.cid) + c.s.Unlock() + return it } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } @@ -803,8 +807,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } *lbls = append((*lbls)[:0], s.lset...) - s.mtx.RLock() - defer s.mtx.RUnlock() + s.Lock() + defer s.Unlock() *chks = (*chks)[:0] @@ -956,11 +960,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for hash, all := range s.hashes[i] { for _, series := range all { - series.mtx.Lock() + series.Lock() rmChunks += series.truncateChunksBefore(mint) if len(series.chunks) > 0 { - series.mtx.Unlock() + series.Unlock() continue } @@ -983,7 +987,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { s.locks[j].Unlock() } - series.mtx.Unlock() + series.Unlock() } } @@ -1040,8 +1044,10 @@ type sample struct { v float64 } +// memSeries is the in-memory representation of a series. None of its methods +// are goroutine safe and its the callers responsibility to lock it. type memSeries struct { - mtx sync.RWMutex + sync.Mutex ref uint64 lset labels.Labels @@ -1143,8 +1149,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 - s.mtx.Lock() - c := s.head() if c == nil { @@ -1152,7 +1156,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { chunkCreated = true } if c.maxTime >= t { - s.mtx.Unlock() return false, chunkCreated } if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { @@ -1175,8 +1178,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.mtx.Unlock() - return true, chunkCreated } diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index ddc2c4f52..fd9b25162 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -292,10 +292,22 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM w.buf2.putUvarint(len(chunks)) - for _, c := range chunks { + if len(chunks) > 0 { + c := chunks[0] w.buf2.putVarint64(c.MinTime) - w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) w.buf2.putUvarint64(c.Ref) + t0 := c.MaxTime + ref0 := int64(c.Ref) + + for _, c := range chunks[1:] { + w.buf2.putUvarint64(uint64(c.MinTime - t0)) + w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime)) + t0 = c.MaxTime + + w.buf2.putVarint64(int64(c.Ref) - ref0) + ref0 = int64(c.Ref) + } } w.buf1.reset() @@ -335,10 +347,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - - // NOTE: len(s) gives the number of runes, not the number of bytes. - // Therefore the read-back length for strings with unicode characters will - // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -636,7 +644,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d2.uvarintTempStr()) } res[strings.Join(keys, sep)] = uint32(d2.uvarint()) @@ -673,7 +681,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) { d := r.decbufAt(int(o)) - s := d.uvarintStr() + s := d.uvarintTempStr() if d.err() != nil { return "", errors.Wrapf(d.err(), "read symbol at %d", o) } @@ -688,7 +696,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { sym := make(map[string]struct{}, count) for ; count > 0; count-- { - s := d2.uvarintStr() + s := d2.uvarintTempStr() sym[s] = struct{}{} } @@ -775,17 +783,34 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) // Read the chunks meta data. k = int(d2.uvarint()) - for i := 0; i < k; i++ { - mint := d2.varint64() - maxt := d2.varint64() - off := d2.uvarint64() + if k == 0 { + return nil + } + + t0 := d2.varint64() + maxt := int64(d2.uvarint64()) + t0 + ref0 := int64(d2.uvarint64()) + + *chks = append(*chks, ChunkMeta{ + Ref: uint64(ref0), + MinTime: t0, + MaxTime: maxt, + }) + t0 = maxt + + for i := 1; i < k; i++ { + mint := int64(d2.uvarint64()) + t0 + maxt := int64(d2.uvarint64()) + mint + + ref0 += d2.varint64() + t0 = maxt if d2.err() != nil { return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ - Ref: off, + Ref: uint64(ref0), MinTime: mint, MaxTime: maxt, }) diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index 1dadc8f2c..9af9a1853 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -52,13 +52,16 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. +// SamplesCB is the callback after reading samples. The passed slice +// is only valid until the call returns. type SamplesCB func([]RefSample) error -// SeriesCB is the callback after reading series. +// SeriesCB is the callback after reading series. The passed slice +// is only valid until the call returns. type SeriesCB func([]RefSeries) error -// DeletesCB is the callback after reading deletes. +// DeletesCB is the callback after reading deletes. The passed slice +// is only valid until the call returns. type DeletesCB func([]Stone) error // WAL is a write ahead log that can log new series labels and samples. @@ -395,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySeries, flag, buf.get()) w.putBuffer(buf) @@ -410,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -422,6 +425,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySamples, flag, buf.get()) w.putBuffer(buf) @@ -436,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -448,6 +451,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntryDeletes, flag, buf.get()) w.putBuffer(buf) @@ -464,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -522,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -546,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) @@ -594,6 +606,9 @@ func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err } + if w.head() == nil { + return nil + } return fileutil.Fdatasync(w.head().File) } @@ -655,8 +670,6 @@ const ( ) func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - w.mtx.Lock() - defer w.mtx.Unlock() // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. @@ -769,6 +782,10 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry + seriesBuf []RefSeries + sampleBuf []RefSample + tombstoneBuf []Stone + err error } @@ -996,7 +1013,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { } func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - series := []RefSeries{} + r.seriesBuf = r.seriesBuf[:0] + dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1010,7 +1028,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - series = append(series, RefSeries{ + r.seriesBuf = append(r.seriesBuf, RefSeries{ Ref: ref, Labels: lset, }) @@ -1019,16 +1037,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { return nil, dec.err() } if len(dec.b) > 0 { - return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return series, nil + return r.seriesBuf, nil } func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { if len(b) == 0 { return nil, nil } - samples := []RefSample{} + r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1041,7 +1059,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - samples = append(samples, RefSample{ + r.sampleBuf = append(r.sampleBuf, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1049,20 +1067,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) } if len(dec.b) > 0 { - return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return samples, nil + return r.sampleBuf, nil } func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { dec := &decbuf{b: b} - var stones []Stone + r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - stones = append(stones, Stone{ + r.tombstoneBuf = append(r.tombstoneBuf, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1073,7 +1091,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { return nil, dec.err() } if len(dec.b) > 0 { - return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return stones, nil + return r.tombstoneBuf, nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index bb499c8b4..f5c7a015f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -859,22 +859,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "AoNkGFKIyLNi4a/QcO8p5D7xIXs=", + "checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=", "path": "github.com/prometheus/tsdb", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02", - "revisionTime": "2017-09-07T11:04:02Z" + "revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563", + "revisionTime": "2017-09-11T08:41:33Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",