Merge pull request #1457 from prometheus/beorn7/promtool

Add a command to promtool that dumps metadata of heads.db
This commit is contained in:
Björn Rabenstein 2016-03-07 17:22:48 +01:00
commit 1bd4c92e1f
5 changed files with 342 additions and 179 deletions

View file

@ -199,7 +199,7 @@ func VersionCmd(t cli.Term, _ ...string) int {
if err := tmpl.ExecuteTemplate(&buf, "version", version.Map); err != nil {
panic(err)
}
t.Out(strings.TrimSpace(buf.String()))
fmt.Fprintln(t.Out(), strings.TrimSpace(buf.String()))
return 0
}

242
storage/local/heads.go Normal file
View file

@ -0,0 +1,242 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/codable"
)
const (
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 2
headsFormatLegacyVersion = 1 // Can read, but will never write.
headsMagicString = "PrometheusHeads"
)
// headsScanner is a scanner to read time series with their heads from a
// heads.db file. It follows a similar semantics as the bufio.Scanner.
// It is not safe to use a headsScanner concurrently.
type headsScanner struct {
f *os.File
r *bufio.Reader
fp model.Fingerprint // Read after each scan() call that has returned true.
series *memorySeries // Read after each scan() call that has returned true.
version int64 // Read after newHeadsScanner has returned.
seriesTotal uint64 // Read after newHeadsScanner has returned.
seriesCurrent uint64
chunksToPersistTotal int64 // Read after scan() has returned false.
err error // Read after scan() has returned false.
}
func newHeadsScanner(filename string) *headsScanner {
hs := &headsScanner{}
defer func() {
if hs.f != nil && hs.err != nil {
hs.f.Close()
}
}()
if hs.f, hs.err = os.Open(filename); hs.err != nil {
return hs
}
hs.r = bufio.NewReaderSize(hs.f, fileBufSize)
buf := make([]byte, len(headsMagicString))
if _, hs.err = io.ReadFull(hs.r, buf); hs.err != nil {
return hs
}
magic := string(buf)
if magic != headsMagicString {
hs.err = fmt.Errorf(
"unexpected magic string, want %q, got %q",
headsMagicString, magic,
)
return hs
}
hs.version, hs.err = binary.ReadVarint(hs.r)
if (hs.version != headsFormatVersion && hs.version != headsFormatLegacyVersion) || hs.err != nil {
hs.err = fmt.Errorf(
"unknown or unreadable heads format version, want %d, got %d, error: %s",
headsFormatVersion, hs.version, hs.err,
)
return hs
}
if hs.seriesTotal, hs.err = codable.DecodeUint64(hs.r); hs.err != nil {
return hs
}
return hs
}
// scan works like bufio.Scanner.Scan.
func (hs *headsScanner) scan() bool {
if hs.seriesCurrent == hs.seriesTotal || hs.err != nil {
return false
}
var (
seriesFlags byte
fpAsInt uint64
metric codable.Metric
persistWatermark int64
modTimeNano int64
modTime time.Time
chunkDescsOffset int64
savedFirstTime int64
numChunkDescs int64
firstTime int64
lastTime int64
encoding byte
)
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
if fpAsInt, hs.err = codable.DecodeUint64(hs.r); hs.err != nil {
return false
}
hs.fp = model.Fingerprint(fpAsInt)
if hs.err = metric.UnmarshalFromReader(hs.r); hs.err != nil {
return false
}
if hs.version != headsFormatLegacyVersion {
// persistWatermark only present in v2.
persistWatermark, hs.err = binary.ReadVarint(hs.r)
if hs.err != nil {
return false
}
modTimeNano, hs.err = binary.ReadVarint(hs.r)
if hs.err != nil {
return false
}
if modTimeNano != -1 {
modTime = time.Unix(0, modTimeNano)
}
}
if chunkDescsOffset, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
if savedFirstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
if hs.version == headsFormatLegacyVersion {
if headChunkPersisted {
persistWatermark = numChunkDescs
} else {
persistWatermark = numChunkDescs - 1
}
}
headChunkClosed := true // Initial assumption.
for i := int64(0); i < numChunkDescs; i++ {
if i < persistWatermark {
if firstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs[i] = &chunkDesc{
chunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime),
}
numMemChunkDescs.Inc()
} else {
// Non-persisted chunk.
// If there are non-persisted chunks at all, we consider
// the head chunk not to be closed yet.
headChunkClosed = false
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
}
chunk := newChunkForEncoding(chunkEncoding(encoding))
if hs.err = chunk.unmarshal(hs.r); hs.err != nil {
return false
}
cd := newChunkDesc(chunk, chunk.firstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
hs.chunksToPersistTotal++
cd.maybePopulateLastTime()
}
chunkDescs[i] = cd
}
}
hs.series = &memorySeries{
metric: model.Metric(metric),
chunkDescs: chunkDescs,
persistWatermark: int(persistWatermark),
modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
headChunkClosed: headChunkClosed,
}
hs.seriesCurrent++
return true
}
// close closes the underlying file if required.
func (hs *headsScanner) close() {
if hs.f != nil {
hs.f.Close()
}
}
// DumpHeads writes the metadata of the provided heads file in a human-readable
// form.
func DumpHeads(filename string, out io.Writer) error {
hs := newHeadsScanner(filename)
defer hs.close()
if hs.err == nil {
fmt.Fprintf(
out,
">>> Dumping %d series from heads file %q with format version %d. <<<\n",
hs.seriesTotal, filename, hs.version,
)
}
for hs.scan() {
s := hs.series
fmt.Fprintf(
out,
"FP=%v\tMETRIC=%s\tlen(chunkDescs)=%d\tpersistWatermark=%d\tchunkDescOffset=%d\tsavedFirstTime=%v\tlastTime=%v\theadChunkClosed=%t\n",
hs.fp, s.metric, len(s.chunkDescs), s.persistWatermark, s.chunkDescsOffset, s.savedFirstTime, s.lastTime, s.headChunkClosed,
)
}
if hs.err == nil {
fmt.Fprintf(
out,
">>> Dump complete. %d chunks to persist. <<<\n",
hs.chunksToPersistTotal,
)
}
return hs.err
}

View file

@ -47,12 +47,6 @@ const (
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 2
headsFormatLegacyVersion = 1 // Can read, but will never write.
headsMagicString = "PrometheusHeads"
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
mappingsFormatVersion = 1
@ -699,190 +693,36 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
// start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
var chunkDescsTotal int64
fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries}
defer func() {
if sm != nil && p.dirty {
if p.dirty {
log.Warn("Persistence layer appears dirty.")
err = p.recoverFromCrash(fingerprintToSeries)
if err != nil {
sm = nil
}
}
if err == nil {
numMemChunkDescs.Add(float64(chunkDescsTotal))
}
}()
f, err := os.Open(p.headsFileName())
if os.IsNotExist(err) {
hs := newHeadsScanner(p.headsFileName())
defer hs.close()
for hs.scan() {
fingerprintToSeries[hs.fp] = hs.series
}
if os.IsNotExist(hs.err) {
return sm, 0, nil
}
if err != nil {
log.Warn("Could not open heads file:", err)
if hs.err != nil {
p.dirty = true
return
log.
With("file", p.headsFileName()).
With("error", hs.err).
Error("Error reading heads file.")
return sm, 0, hs.err
}
defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize)
buf := make([]byte, len(headsMagicString))
if _, err := io.ReadFull(r, buf); err != nil {
log.Warn("Could not read from heads file:", err)
p.dirty = true
return sm, 0, nil
}
magic := string(buf)
if magic != headsMagicString {
log.Warnf(
"unexpected magic string, want %q, got %q",
headsMagicString, magic,
)
p.dirty = true
return
}
version, err := binary.ReadVarint(r)
if (version != headsFormatVersion && version != headsFormatLegacyVersion) || err != nil {
log.Warnf("unknown heads format version, want %d", headsFormatVersion)
p.dirty = true
return sm, 0, nil
}
numSeries, err := codable.DecodeUint64(r)
if err != nil {
log.Warn("Could not decode number of series:", err)
p.dirty = true
return sm, 0, nil
}
for ; numSeries > 0; numSeries-- {
seriesFlags, err := r.ReadByte()
if err != nil {
log.Warn("Could not read series flags:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r)
if err != nil {
log.Warn("Could not decode fingerprint:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil {
log.Warn("Could not decode metric:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
var persistWatermark int64
var modTime time.Time
if version != headsFormatLegacyVersion {
// persistWatermark only present in v2.
persistWatermark, err = binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode persist watermark:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
modTimeNano, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode modification time:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
if modTimeNano != -1 {
modTime = time.Unix(0, modTimeNano)
}
}
chunkDescsOffset, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode chunk descriptor offset:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
savedFirstTime, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode saved first time:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
numChunkDescs, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode number of chunk descriptors:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
if version == headsFormatLegacyVersion {
if headChunkPersisted {
persistWatermark = numChunkDescs
} else {
persistWatermark = numChunkDescs - 1
}
}
headChunkClosed := true // Initial assumption.
for i := int64(0); i < numChunkDescs; i++ {
if i < persistWatermark {
firstTime, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode first time:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
log.Warn("Could not decode last time:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
chunkDescs[i] = &chunkDesc{
chunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime),
}
chunkDescsTotal++
} else {
// Non-persisted chunk.
// If there are non-persisted chunks at all, we consider
// the head chunk not to be closed yet.
headChunkClosed = false
encoding, err := r.ReadByte()
if err != nil {
log.Warn("Could not decode chunk type:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
chunk := newChunkForEncoding(chunkEncoding(encoding))
if err := chunk.unmarshal(r); err != nil {
log.Warn("Could not decode chunk:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
cd := newChunkDesc(chunk, chunk.firstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
chunksToPersist++
cd.maybePopulateLastTime()
}
chunkDescs[i] = cd
}
}
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{
metric: model.Metric(metric),
chunkDescs: chunkDescs,
persistWatermark: int(persistWatermark),
modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
headChunkClosed: headChunkClosed,
}
}
return sm, chunksToPersist, nil
return sm, hs.chunksToPersistTotal, nil
}
// dropAndPersistChunks deletes all chunks from a series file whose last sample

View file

@ -0,0 +1,76 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"fmt"
"os"
"strings"
"text/template"
"github.com/prometheus/prometheus/util/cli"
"github.com/prometheus/prometheus/version"
"github.com/prometheus/prometheus/storage/local"
)
// DumpHeadsCmd dumps metadata of a heads.db file.
func DumpHeadsCmd(t cli.Term, args ...string) int {
if len(args) != 1 {
t.Infof("usage: storagetool dump-heads <file>")
return 2
}
if err := local.DumpHeads(args[0], t.Out()); err != nil {
t.Errorf(" FAILED: %s", err)
return 1
}
return 0
}
var versionInfoTmpl = `
prometheus, version {{.version}} (branch: {{.branch}}, revision: {{.revision}})
build user: {{.buildUser}}
build date: {{.buildDate}}
go version: {{.goVersion}}
`
// VersionCmd prints the binaries version information.
func VersionCmd(t cli.Term, _ ...string) int {
tmpl := template.Must(template.New("version").Parse(versionInfoTmpl))
var buf bytes.Buffer
if err := tmpl.ExecuteTemplate(&buf, "version", version.Map); err != nil {
panic(err)
}
fmt.Fprintln(t.Out(), strings.TrimSpace(buf.String()))
return 0
}
func main() {
app := cli.NewApp("storagetool")
app.Register("dump-heads", &cli.Command{
Desc: "dump metadata of a heads.db checkpoint file",
Run: DumpHeadsCmd,
})
app.Register("version", &cli.Command{
Desc: "print the version of this binary",
Run: VersionCmd,
})
t := cli.BasicTerm(os.Stdout, os.Stderr)
os.Exit(app.Run(t, os.Args[1:]...))
}

View file

@ -32,7 +32,8 @@ type Command struct {
type Term interface {
Infof(format string, v ...interface{})
Errorf(format string, v ...interface{})
Out(format string)
Out() io.Writer
Err() io.Writer
}
type basicTerm struct {
@ -52,9 +53,13 @@ func (t *basicTerm) Errorf(format string, v ...interface{}) {
}
// Out implements Term.
func (t *basicTerm) Out(msg string) {
fmt.Fprint(t.out, msg)
fmt.Fprint(t.out, "\n")
func (t *basicTerm) Out() io.Writer {
return t.out
}
// Err implements Term.
func (t *basicTerm) Err() io.Writer {
return t.err
}
// BasicTerm returns a Term writing Infof and Errorf to err and Out to out.