mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge pull request #303 from Bplotka/bp/better-compact-logging
repair + compact: Improved logging for easier future debug purposes.
This commit is contained in:
commit
195bc0d286
42
compact.go
42
compact.go
|
@ -14,6 +14,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
@ -33,7 +34,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExponentialBlockRanges returns the time ranges based on the stepSize
|
// ExponentialBlockRanges returns the time ranges based on the stepSize.
|
||||||
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
||||||
ranges := make([]int64, 0, steps)
|
ranges := make([]int64, 0, steps)
|
||||||
curRange := minSize
|
curRange := minSize
|
||||||
|
@ -312,9 +313,12 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||||
// Compact creates a new block in the compactor's directory from the blocks in the
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
||||||
// provided directories.
|
// provided directories.
|
||||||
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) {
|
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) {
|
||||||
var blocks []BlockReader
|
var (
|
||||||
var bs []*Block
|
blocks []BlockReader
|
||||||
var metas []*BlockMeta
|
bs []*Block
|
||||||
|
metas []*BlockMeta
|
||||||
|
uids []string
|
||||||
|
)
|
||||||
|
|
||||||
for _, d := range dirs {
|
for _, d := range dirs {
|
||||||
b, err := OpenBlock(d, c.chunkPool)
|
b, err := OpenBlock(d, c.chunkPool)
|
||||||
|
@ -331,13 +335,23 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
|
||||||
metas = append(metas, meta)
|
metas = append(metas, meta)
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
bs = append(bs, b)
|
bs = append(bs, b)
|
||||||
|
uids = append(uids, meta.ULID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid = ulid.MustNew(ulid.Now(), entropy)
|
uid = ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
err = c.write(dest, compactBlockMetas(uid, metas...), blocks...)
|
meta := compactBlockMetas(uid, metas...)
|
||||||
|
err = c.write(dest, meta, blocks...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
level.Info(c.logger).Log(
|
||||||
|
"msg", "compact blocks",
|
||||||
|
"count", len(blocks),
|
||||||
|
"mint", meta.MinTime,
|
||||||
|
"maxt", meta.MaxTime,
|
||||||
|
"ulid", meta.ULID,
|
||||||
|
"sources", fmt.Sprintf("%v", uids),
|
||||||
|
)
|
||||||
return uid, nil
|
return uid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +379,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
|
||||||
meta.Compaction.Level = 1
|
meta.Compaction.Level = 1
|
||||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||||
|
|
||||||
return uid, c.write(dest, meta, b)
|
err := c.write(dest, meta, b)
|
||||||
|
if err != nil {
|
||||||
|
return uid, err
|
||||||
|
}
|
||||||
|
|
||||||
|
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
|
||||||
|
return uid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// instrumentedChunkWriter is used for level 1 compactions to record statistics
|
// instrumentedChunkWriter is used for level 1 compactions to record statistics
|
||||||
|
@ -390,8 +410,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates a new block that is the union of the provided blocks into dir.
|
||||||
// It cleans up all files of the old blocks after completing successfully.
|
// It cleans up all files of the old blocks after completing successfully.
|
||||||
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
|
||||||
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
|
|
||||||
|
|
||||||
dir := filepath.Join(dest, meta.ULID.String())
|
dir := filepath.Join(dest, meta.ULID.String())
|
||||||
tmp := dir + ".tmp"
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
|
@ -472,7 +490,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
return errors.Wrap(err, "sync temporary dir file")
|
return errors.Wrap(err, "sync temporary dir file")
|
||||||
}
|
}
|
||||||
|
|
||||||
// close temp dir before rename block dir(for windows platform)
|
// Close temp dir before rename block dir (for windows platform).
|
||||||
if err = df.Close(); err != nil {
|
if err = df.Close(); err != nil {
|
||||||
return errors.Wrap(err, "close temporary dir")
|
return errors.Wrap(err, "close temporary dir")
|
||||||
}
|
}
|
||||||
|
@ -482,6 +500,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
return errors.Wrap(err, "rename block dir")
|
return errors.Wrap(err, "rename block dir")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,11 +737,6 @@ type compactionMerger struct {
|
||||||
intervals Intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionSeries struct {
|
|
||||||
labels labels.Labels
|
|
||||||
chunks []*chunks.Meta
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
|
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
|
||||||
c := &compactionMerger{
|
c := &compactionMerger{
|
||||||
a: a,
|
a: a,
|
||||||
|
|
13
repair.go
13
repair.go
|
@ -36,9 +36,20 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if meta.Version == 1 {
|
if meta.Version == 1 {
|
||||||
|
level.Info(logger).Log(
|
||||||
|
"msg", "found healthy block",
|
||||||
|
"mint", meta.MinTime,
|
||||||
|
"maxt", meta.MaxTime,
|
||||||
|
"ulid", meta.ULID,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
level.Info(logger).Log("msg", "fixing broken block", "ulid", meta.ULID)
|
level.Info(logger).Log(
|
||||||
|
"msg", "fixing broken block",
|
||||||
|
"mint", meta.MinTime,
|
||||||
|
"maxt", meta.MaxTime,
|
||||||
|
"ulid", meta.ULID,
|
||||||
|
)
|
||||||
|
|
||||||
repl, err := os.Create(filepath.Join(d, "index.repaired"))
|
repl, err := os.Create(filepath.Join(d, "index.repaired"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue