diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 0e75080c5d..3fcf917ce2 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -126,9 +126,13 @@ func init() { "Maximum duration compacted blocks may span.", ) cfg.fs.IntVar( - &cfg.tsdb.AppendableBlocks, "storage.tsdb.AppendableBlocks", 2, + &cfg.tsdb.AppendableBlocks, "storage.tsdb.appendable-blocks", 2, "Number of head blocks that can be appended to.", ) + cfg.fs.DurationVar( + &cfg.tsdb.Retention, "storage.tsdb.retention", 15*24*time.Hour, + "How long to retain samples in the storage.", + ) cfg.fs.StringVar( &cfg.localStorageEngine, "storage.local.engine", "persisted", "Local storage engine. Supported values are: 'persisted' (full local storage with on-disk persistence) and 'none' (no local storage).", diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 39aecfdbff..8f2365af43 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -70,13 +70,6 @@ func Main() int { return 0 } - // go func() { - // for { - // time.Sleep(30 * time.Second) - // debug.FreeOSMemory() - // } - // }() - log.Infoln("Starting prometheus", version.Info()) log.Infoln("Build context", version.BuildContext()) diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 8351252071..a19bda967f 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -34,15 +34,19 @@ type Options struct { // After a new block is started for timestamp t0 or higher, appends with // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. AppendableBlocks int + + // Duration for how long to retain data. + Retention time.Duration } // Open returns a new storage backed by a tsdb database. func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage, error) { db, err := tsdb.Open(path, nil, r, &tsdb.Options{ - WALFlushInterval: 10 * time.Second, - MinBlockDuration: uint64(opts.MinBlockDuration.Seconds() * 1000), - MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000), - AppendableBlocks: opts.AppendableBlocks, + WALFlushInterval: 10 * time.Second, + MinBlockDuration: uint64(opts.MinBlockDuration.Seconds() * 1000), + MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000), + AppendableBlocks: opts.AppendableBlocks, + RetentionDuration: uint64(opts.Retention.Seconds() * 1000), }) if err != nil { return nil, err diff --git a/vendor/github.com/fabxc/tsdb/chunks.go b/vendor/github.com/fabxc/tsdb/chunks.go index c0c7954a6e..b3aaeb206d 100644 --- a/vendor/github.com/fabxc/tsdb/chunks.go +++ b/vendor/github.com/fabxc/tsdb/chunks.go @@ -36,7 +36,7 @@ type ChunkWriter interface { // must be populated. // After returning successfully, the Ref fields in the ChunkMetas // is set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...ChunkMeta) error + WriteChunks(chunks ...*ChunkMeta) error // Close writes any required finalization and closes the resources // associated with the underlying writer. @@ -156,7 +156,7 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error { return err } -func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { +func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. maxLen := int64(binary.MaxVarintLen32) @@ -184,9 +184,7 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { } seq := uint64(w.seq()) << 32 - for i := range chks { - chk := &chks[i] - + for _, chk := range chks { chk.Ref = seq | uint64(w.n) n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index 573e9b440a..a1d7b1e5c5 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -334,7 +334,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri type compactionSet interface { Next() bool - At() (labels.Labels, []ChunkMeta) + At() (labels.Labels, []*ChunkMeta) Err() error } @@ -344,7 +344,7 @@ type compactionSeriesSet struct { chunks ChunkReader l labels.Labels - c []ChunkMeta + c []*ChunkMeta err error } @@ -365,9 +365,7 @@ func (c *compactionSeriesSet) Next() bool { if c.err != nil { return false } - for i := range c.c { - chk := &c.c[i] - + for _, chk := range c.c { chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { return false @@ -384,7 +382,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { +func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta) { return c.l, c.c } @@ -393,12 +391,12 @@ type compactionMerger struct { aok, bok bool l labels.Labels - c []ChunkMeta + c []*ChunkMeta } type compactionSeries struct { labels labels.Labels - chunks []ChunkMeta + chunks []*ChunkMeta } func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { @@ -459,7 +457,7 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { +func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta) { return c.l, c.c } diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index 7124ce1067..ec0f0a7018 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -207,10 +207,13 @@ func (db *DB) run() { var merr MultiError - changes, err := db.compact() + changes1, err := db.retentionCutoff() merr.Add(err) - if changes { + changes2, err := db.compact() + merr.Add(err) + + if changes1 || changes2 { merr.Add(db.reloadBlocks()) } if err := merr.Err(); err != nil { @@ -223,11 +226,31 @@ func (db *DB) run() { } } +func (db *DB) retentionCutoff() (bool, error) { + if db.opts.RetentionDuration == 0 { + return false, nil + } + + db.mtx.RLock() + defer db.mtx.RUnlock() + + // We don't count the span covered by head blocks towards the + // retention time as it generally makes up a fraction of it. + if len(db.persisted) == 0 { + return false, nil + } + + last := db.persisted[len(db.persisted)-1] + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + + return retentionCutoff(db.dir, mint) +} + func (db *DB) compact() (changes bool, err error) { - // Check whether we have pending head blocks that are ready to be persisted. - // They have the highest priority. db.headmtx.RLock() + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. var singles []*headBlock // Collect head blocks that are ready for compaction. Write them after @@ -297,38 +320,40 @@ Loop: return changes, nil } -// func (db *DB) retentionCutoff() error { -// if db.opts.RetentionDuration == 0 { -// return nil -// } -// h := db.heads[len(db.heads)-1] -// t := h.meta.MinTime - int64(db.opts.RetentionDuration) +// retentionCutoff deletes all directories of blocks in dir that are strictly +// before mint. +func retentionCutoff(dir string, mint int64) (bool, error) { + dirs, err := blockDirs(dir) + if err != nil { + return false, errors.Wrapf(err, "list block dirs %s", dir) + } -// var ( -// blocks = db.blocks() -// i int -// b Block -// ) -// for i, b = range blocks { -// if b.Meta().MinTime >= t { -// break -// } -// } -// if i <= 1 { -// return nil -// } -// db.logger.Log("msg", "retention cutoff", "idx", i-1) -// db.removeBlocks(0, i) + changes := false -// for _, b := range blocks[:i] { -// if err := os.RemoveAll(b.Dir()); err != nil { -// return errors.Wrap(err, "removing old block") -// } -// } -// return nil -// } + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return changes, errors.Wrapf(err, "read block meta %s", dir) + } + // The first block we encounter marks that we crossed the boundary + // of deletable blocks. + if meta.MaxTime >= mint { + break + } + changes = true + + if err := os.RemoveAll(dir); err != nil { + return changes, err + } + } + + return changes, nil +} func (db *DB) reloadBlocks() error { + var cs []io.Closer + defer closeAll(cs...) + db.mtx.Lock() defer db.mtx.Unlock() @@ -381,11 +406,11 @@ func (db *DB) reloadBlocks() error { seqBlocks[meta.Sequence] = b } + // Close all blocks that we no longer need. They are closed after returning all + // locks to avoid questionable locking order. for seq, b := range db.seqBlocks { if nb, ok := seqBlocks[seq]; !ok || nb != b { - if err := b.Close(); err != nil { - return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) - } + cs = append(cs, b) } } @@ -401,9 +426,8 @@ func (db *DB) Close() error { close(db.stopc) <-db.donec - // Lock mutex and leave it locked so we panic if there's a bug causing - // the block to be used afterwards. db.mtx.Lock() + defer db.mtx.Unlock() var g errgroup.Group @@ -427,14 +451,20 @@ func (db *DB) Appender() Appender { db.mtx.RLock() a := &dbAppender{db: db} + // Only instantiate appender after returning the headmtx to avoid + // questionable locking order. db.headmtx.RLock() - for _, b := range db.appendable() { - a.heads = append(a.heads, b.Appender().(*headAppender)) - } + app := db.appendable() + heads := make([]*headBlock, len(app)) + copy(heads, app) db.headmtx.RUnlock() + for _, b := range heads { + a.heads = append(a.heads, b.Appender().(*headAppender)) + } + return a } @@ -486,24 +516,30 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() + var newHeads []*headBlock + if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() return nil, err } if len(a.heads) == 0 { - for _, b := range a.db.appendable() { - a.heads = append(a.heads, b.Appender().(*headAppender)) - } + newHeads = append(newHeads, a.db.appendable()...) } else { maxSeq := a.heads[len(a.heads)-1].meta.Sequence for _, b := range a.db.appendable() { if b.meta.Sequence > maxSeq { - a.heads = append(a.heads, b.Appender().(*headAppender)) + newHeads = append(newHeads, b) } } } a.db.headmtx.Unlock() + + // Instantiate appenders after returning headmtx to avoid questionable + // locking order. + for _, b := range newHeads { + a.heads = append(a.heads, b.Appender().(*headAppender)) + } } for i := len(a.heads) - 1; i >= 0; i-- { if h := a.heads[i]; t >= h.meta.MinTime { diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 211cadf775..eed2ff2222 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - "github.com/bradfitz/slice" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" @@ -43,13 +42,11 @@ type headBlock struct { wal *WAL activeWriters uint64 + closed bool // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries - // mapping maps a series ID to its position in an ordered list - // of all series. The orderDirty flag indicates that it has gone stale. - mapper *positionMapper // hashes contains a collision map of label set hashes of chunks // to their chunk descs. hashes map[uint64][]*memSeries @@ -105,7 +102,6 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, - mapper: newPositionMapper(nil), meta: *meta, } @@ -131,8 +127,6 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { return nil, errors.Wrap(err, "consume WAL") } - h.updateMapping() - return h, nil } @@ -144,9 +138,8 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { - // Lock mutex and leave it locked so we panic if there's a bug causing - // the block to be used afterwards. h.mtx.Lock() + defer h.mtx.Unlock() if err := h.wal.Close(); err != nil { return errors.Wrapf(err, "close WAL for head %s", h.dir) @@ -163,6 +156,8 @@ func (h *headBlock) Close() error { if meta.ULID == h.meta.ULID { return writeMetaFile(h.dir, &h.meta) } + + h.closed = true return nil } @@ -182,6 +177,10 @@ func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) h.mtx.RLock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } @@ -451,7 +450,7 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) { } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { +func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -459,13 +458,13 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) return nil, nil, ErrNotFound } s := h.series[ref] - metas := make([]ChunkMeta, 0, len(s.chunks)) + metas := make([]*ChunkMeta, 0, len(s.chunks)) s.mtx.RLock() defer s.mtx.RUnlock() for i, c := range s.chunks { - metas = append(metas, ChunkMeta{ + metas = append(metas, &ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), @@ -527,43 +526,26 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { return s } -func (h *headBlock) updateMapping() { - h.mtx.RLock() - - if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) { - h.mtx.RUnlock() - return - } - - series := make([]*memSeries, len(h.series)) - copy(series, h.series) - - h.mtx.RUnlock() - - s := slice.SortInterface(series, func(i, j int) bool { - return labels.Compare(series[i].lset, series[j].lset) < 0 - }) - - h.mapper.update(s) -} - // remapPostings changes the order of the postings from their ID to the ordering // of the series they reference. // Returned postings have no longer monotonic IDs and MUST NOT be used for regular // postings set operations, i.e. intersect and merge. func (h *headBlock) remapPostings(p Postings) Postings { - list, err := expandPostings(p) - if err != nil { - return errPostings{err: err} + // Expand the postings but only up until the point where the mapper + // covers existing metrics. + ep := make([]uint32, 0, 64) + + for p.Next() { + ep = append(ep, p.At()) + } + if err := p.Err(); err != nil { + return errPostings{err: errors.Wrap(err, "expand postings")} } - h.mapper.mtx.Lock() - defer h.mapper.mtx.Unlock() - - h.updateMapping() - h.mapper.Sort(list) - - return newListPostings(list) + sort.Slice(ep, func(i, j int) bool { + return labels.Compare(h.series[i].lset, h.series[j].lset) < 0 + }) + return newListPostings(ep) } type memSeries struct { @@ -675,50 +657,3 @@ func (it *memSafeIterator) At() (int64, float64) { s := it.buf[4-(it.total-it.i)] return s.t, s.v } - -// positionMapper stores a position mapping from unsorted to -// sorted indices of a sortable collection. -type positionMapper struct { - mtx sync.RWMutex - sortable sort.Interface - iv, fw []int -} - -func newPositionMapper(s sort.Interface) *positionMapper { - m := &positionMapper{} - if s != nil { - m.update(s) - } - return m -} - -func (m *positionMapper) Len() int { return m.sortable.Len() } -func (m *positionMapper) Less(i, j int) bool { return m.sortable.Less(i, j) } - -func (m *positionMapper) Swap(i, j int) { - m.sortable.Swap(i, j) - - m.iv[i], m.iv[j] = m.iv[j], m.iv[i] -} - -func (m *positionMapper) Sort(l []uint32) { - slice.Sort(l, func(i, j int) bool { - return m.fw[l[i]] < m.fw[l[j]] - }) -} - -func (m *positionMapper) update(s sort.Interface) { - m.sortable = s - - m.iv = make([]int, s.Len()) - m.fw = make([]int, s.Len()) - - for i := range m.iv { - m.iv[i] = i - } - sort.Sort(m) - - for i, k := range m.iv { - m.fw[k] = i - } -} diff --git a/vendor/github.com/fabxc/tsdb/index.go b/vendor/github.com/fabxc/tsdb/index.go index 23771f035e..f957085763 100644 --- a/vendor/github.com/fabxc/tsdb/index.go +++ b/vendor/github.com/fabxc/tsdb/index.go @@ -33,7 +33,7 @@ type IndexWriter interface { // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error + AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -49,8 +49,8 @@ type IndexWriter interface { type indexWriterSeries struct { labels labels.Labels - chunks []ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []*ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference } // indexWriter implements the IndexWriter interface for the standard @@ -142,7 +142,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.bufw, b[:]) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { +func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { if _, ok := w.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } @@ -419,7 +419,7 @@ type IndexReader interface { Postings(name, value string) (Postings, error) // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []ChunkMeta, error) + Series(ref uint32) (labels.Labels, []*ChunkMeta, error) // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -599,7 +599,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { +func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { k, n := binary.Uvarint(r.b[ref:]) if n < 1 { return nil, nil, errors.Wrap(errInvalidSize, "number of labels") @@ -642,7 +642,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { } b = b[n:] - chunks := make([]ChunkMeta, 0, k) + chunks := make([]*ChunkMeta, 0, k) for i := 0; i < int(k); i++ { firstTime, n := binary.Varint(b) @@ -663,7 +663,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { } b = b[n:] - chunks = append(chunks, ChunkMeta{ + chunks = append(chunks, &ChunkMeta{ Ref: o, MinTime: firstTime, MaxTime: lastTime, diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index a397b9a632..c027a8dc0e 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -66,6 +66,12 @@ func (s *DB) Querier(mint, maxt int64) Querier { // TODO(fabxc): find nicer solution. if hb, ok := b.(*headBlock); ok { + // TODO(fabxc): temporary refactored. + hb.mtx.RLock() + if hb.closed { + panic(fmt.Sprintf("block %s already closed", hb.dir)) + } + hb.mtx.RUnlock() q.postingsMapper = hb.remapPostings } @@ -109,7 +115,7 @@ func (q *querier) Select(ms ...labels.Matcher) SeriesSet { r := q.blocks[0].Select(ms...) for _, s := range q.blocks[1:] { - r = newPartitionSeriesSet(r, s.Select(ms...)) + r = newMergedSeriesSet(r, s.Select(ms...)) } return r } @@ -135,15 +141,6 @@ type blockQuerier struct { mint, maxt int64 } -func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier { - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: ix, - chunks: c, - } -} - func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { var ( its []Postings @@ -282,39 +279,14 @@ func (nopSeriesSet) At() Series { return nil } func (nopSeriesSet) Err() error { return nil } type mergedSeriesSet struct { - sets []SeriesSet - - cur int - err error -} - -func (s *mergedSeriesSet) At() Series { return s.sets[s.cur].At() } -func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() } - -func (s *mergedSeriesSet) Next() bool { - // TODO(fabxc): We just emit the sets one after one. They are each - // lexicographically sorted. Should we emit their union sorted too? - if s.sets[s.cur].Next() { - return true - } - - if s.cur == len(s.sets)-1 { - return false - } - s.cur++ - - return s.Next() -} - -type partitionSeriesSet struct { a, b SeriesSet cur Series adone, bdone bool } -func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { - s := &partitionSeriesSet{a: a, b: b} +func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet { + s := &mergedSeriesSet{a: a, b: b} // Initialize first elements of both sets as Next() needs // one element look-ahead. s.adone = !s.a.Next() @@ -323,18 +295,18 @@ func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { return s } -func (s *partitionSeriesSet) At() Series { +func (s *mergedSeriesSet) At() Series { return s.cur } -func (s *partitionSeriesSet) Err() error { +func (s *mergedSeriesSet) Err() error { if s.a.Err() != nil { return s.a.Err() } return s.b.Err() } -func (s *partitionSeriesSet) compare() int { +func (s *mergedSeriesSet) compare() int { if s.adone { return 1 } @@ -344,7 +316,7 @@ func (s *partitionSeriesSet) compare() int { return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) } -func (s *partitionSeriesSet) Next() bool { +func (s *mergedSeriesSet) Next() bool { if s.adone && s.bdone || s.Err() != nil { return false } @@ -370,7 +342,7 @@ func (s *partitionSeriesSet) Next() bool { type chunkSeriesSet interface { Next() bool - At() (labels.Labels, []ChunkMeta) + At() (labels.Labels, []*ChunkMeta) Err() error } @@ -382,12 +354,12 @@ type baseChunkSeries struct { absent []string // labels that must be unset in results. lset labels.Labels - chks []ChunkMeta + chks []*ChunkMeta err error } -func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } -func (s *baseChunkSeries) Err() error { return s.err } +func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks } +func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { Outer: @@ -425,20 +397,18 @@ type populatedChunkSeries struct { mint, maxt int64 err error - chks []ChunkMeta + chks []*ChunkMeta lset labels.Labels } -func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } -func (s *populatedChunkSeries) Err() error { return s.err } +func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta) { return s.lset, s.chks } +func (s *populatedChunkSeries) Err() error { return s.err } func (s *populatedChunkSeries) Next() bool { for s.set.Next() { lset, chks := s.set.At() - for i := range chks { - c := &chks[i] - + for i, c := range chks { if c.MaxTime < s.mint { chks = chks[1:] continue @@ -493,7 +463,7 @@ func (s *blockSeriesSet) Err() error { return s.err } // time series data. type chunkSeries struct { labels labels.Labels - chunks []ChunkMeta // in-order chunk refs + chunks []*ChunkMeta // in-order chunk refs } func (s *chunkSeries) Labels() labels.Labels { @@ -587,13 +557,13 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - chunks []ChunkMeta + chunks []*ChunkMeta i int cur chunks.Iterator } -func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator { return &chunkSeriesIterator{ chunks: cs, i: 0, diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index b06425e076..2a49dcf41e 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -63,7 +63,7 @@ const ( // OpenWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -73,10 +73,13 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error if err != nil { return nil, err } + if logger == nil { + logger = log.NewNopLogger() + } w := &WAL{ dirFile: df, - logger: l, + logger: logger, flushInterval: flushInterval, donec: make(chan struct{}), stopc: make(chan struct{}), @@ -95,11 +98,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. func (w *WAL) Reader() *WALReader { - var rs []io.ReadCloser - for _, f := range w.files { - rs = append(rs, f) - } - return NewWALReader(rs...) + return NewWALReader(w.logger, w) } // Log writes a batch of new series labels and samples to the log. @@ -126,21 +125,15 @@ func (w *WAL) initSegments() error { if len(fns) == 0 { return nil } - if len(fns) > 1 { - for _, fn := range fns[:len(fns)-1] { - f, err := os.Open(fn) - if err != nil { - return err - } - w.files = append(w.files, f) + // We must open all file in read mode as we may have to truncate along + // the way and any file may become the tail. + for _, fn := range fns { + f, err := os.OpenFile(fn, os.O_RDWR, 0666) + if err != nil { + return err } + w.files = append(w.files, f) } - // The most recent WAL file is the one we have to keep appending to. - f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) - if err != nil { - return err - } - w.files = append(w.files, f) // Consume and validate meta headers. for _, f := range w.files { @@ -275,7 +268,7 @@ func (w *WAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if tf := w.tail(); tf != nil { - return tf.Close() + return errors.Wrapf(tf.Close(), "closing WAL tail %s", tf.Name()) } return nil } @@ -413,7 +406,9 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // WALReader decodes and emits write ahead log entries. type WALReader struct { - rs []io.ReadCloser + logger log.Logger + + wal *WAL cur int buf []byte crc32 hash.Hash32 @@ -424,12 +419,17 @@ type WALReader struct { } // NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(rs ...io.ReadCloser) *WALReader { - return &WALReader{ - rs: rs, - buf: make([]byte, 0, 128*4096), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), +func NewWALReader(logger log.Logger, w *WAL) *WALReader { + if logger == nil { + logger = log.NewNopLogger() } + r := &WALReader{ + logger: logger, + wal: w, + buf: make([]byte, 0, 128*4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + } + return r } // At returns the last decoded entry of labels or samples. @@ -446,19 +446,18 @@ func (r *WALReader) Err() error { // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.rs) { + if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } - cr := r.rs[r.cur] + cf := r.wal.files[r.cur] - et, flag, b, err := r.entry(cr) + et, flag, b, err := r.entry(cf) // If we reached the end of the reader, advance to the next one // and close. // Do not close on the last one as it will still be appended to. - // XXX(fabxc): leaky abstraction. - if err == io.EOF && r.cur < len(r.rs)-1 { + if err == io.EOF && r.cur < len(r.wal.files)-1 { // Current reader completed, close and move to the next one. - if err := cr.Close(); err != nil { + if err := cf.Close(); err != nil { return 0, 0, nil, err } r.cur++ @@ -473,14 +472,46 @@ func (r *WALReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] - et, flag, b, err := r.nextEntry() + if r.cur >= len(r.wal.files) { + return false + } + cf := r.wal.files[r.cur] + + // Save position after last valid entry if we have to truncate the WAL. + lastOffset, err := cf.Seek(0, os.SEEK_CUR) if err != nil { - if err != io.EOF { + r.err = err + return false + } + + et, flag, b, err := r.entry(cf) + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + if err == io.EOF { + if r.cur == len(r.wal.files)-1 { + return false + } + // Current reader completed, close and move to the next one. + if err := cf.Close(); err != nil { r.err = err + return false + } + r.cur++ + return r.Next() + } + if err != nil { + r.err = err + + if _, ok := err.(walCorruptionErr); ok { + r.err = r.truncate(lastOffset) } return false } + // In decoding below we never return a walCorruptionErr for now. + // Those should generally be catched by entry decoding before. + switch et { case WALEntrySamples: if err := r.decodeSamples(flag, b); err != nil { @@ -490,19 +521,52 @@ func (r *WALReader) Next() bool { if err := r.decodeSeries(flag, b); err != nil { r.err = err } - default: - r.err = errors.Errorf("unknown WAL entry type %d", et) } return r.err == nil } +func (r *WALReader) current() *os.File { + return r.wal.files[r.cur] +} + +// truncate the WAL after the last valid entry. +func (r *WALReader) truncate(lastOffset int64) error { + r.logger.Log("msg", "WAL corruption detected; truncating", + "err", r.err, "file", r.current().Name(), "pos", lastOffset) + + // Close and delete all files after the current one. + for _, f := range r.wal.files[r.cur+1:] { + if err := f.Close(); err != nil { + return err + } + if err := os.Remove(f.Name()); err != nil { + return err + } + } + r.wal.files = r.wal.files[:r.cur+1] + + // Seek the current file to the last valid offset where we continue writing from. + _, err := r.current().Seek(lastOffset, os.SEEK_SET) + return err +} + +// walCorruptionErr is a type wrapper for errors that indicate WAL corruption +// and trigger a truncation. +type walCorruptionErr error + +func walCorruptionErrf(s string, args ...interface{}) error { + return walCorruptionErr(errors.Errorf(s, args...)) +} + func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) b := make([]byte, 6) - if _, err := tr.Read(b); err != nil { + if n, err := tr.Read(b); err != nil { return 0, 0, nil, err + } else if n != 6 { + return 0, 0, nil, walCorruptionErrf("invalid entry header size %d", n) } var ( @@ -514,21 +578,28 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if etype == 0 { return 0, 0, nil, io.EOF } + if etype != WALEntrySeries && etype != WALEntrySamples { + return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) + } if length > len(r.buf) { r.buf = make([]byte, length) } buf := r.buf[:length] - if _, err := tr.Read(buf); err != nil { + if n, err := tr.Read(buf); err != nil { return 0, 0, nil, err + } else if n != length { + return 0, 0, nil, walCorruptionErrf("invalid entry body size %d", n) } - _, err := cr.Read(b[:4]) - if err != nil { + + if n, err := cr.Read(b[:4]); err != nil { return 0, 0, nil, err + } else if n != 4 { + return 0, 0, nil, walCorruptionErrf("invalid checksum length %d", n) } if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { - return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) + return 0, 0, nil, walCorruptionErrf("unexpected CRC32 checksum %x, want %x", has, exp) } return etype, flag, buf, nil diff --git a/vendor/vendor.json b/vendor/vendor.json index 17eacc1da4..4d14b8af87 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "hnxY08GfzanNSvD8vjz/wSWnwmk=", + "checksumSHA1": "JeYYg27cZpCWZYwYOm7r+UnUR2o=", "path": "github.com/fabxc/tsdb", - "revision": "32c32013a6d2a8ee5fb231d3f3cb5538128650d2", - "revisionTime": "2017-03-09T14:40:13Z" + "revision": "863d38dfeebaceb69ce57cbba862102e10222256", + "revisionTime": "2017-03-17T14:56:19Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",