mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
Add a command to promtool that dumps metadata of heads.db
I needed this today for debugging. It can certainly be improved, but it's already quite helpful. I refactored the reading of heads.db files out of persistence, which is an improvement, too. I made minor changes to the cli package to allow outputting via the io.Writer interface.
This commit is contained in:
parent
6bbb4af837
commit
f193f2b8ef
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
"github.com/prometheus/prometheus/util/cli"
|
"github.com/prometheus/prometheus/util/cli"
|
||||||
"github.com/prometheus/prometheus/version"
|
"github.com/prometheus/prometheus/version"
|
||||||
)
|
)
|
||||||
|
@ -184,6 +185,19 @@ func checkRules(t cli.Term, filename string) (int, error) {
|
||||||
return len(rules), nil
|
return len(rules), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DumpHeadsCmd dumps metadata of a heads.db file.
|
||||||
|
func DumpHeadsCmd(t cli.Term, args ...string) int {
|
||||||
|
if len(args) != 1 {
|
||||||
|
t.Infof("usage: promtool dump-heads <file>")
|
||||||
|
return 2
|
||||||
|
}
|
||||||
|
if err := local.DumpHeads(args[0], t.Out()); err != nil {
|
||||||
|
t.Errorf(" FAILED: %s", err)
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
var versionInfoTmpl = `
|
var versionInfoTmpl = `
|
||||||
prometheus, version {{.version}} (branch: {{.branch}}, revision: {{.revision}})
|
prometheus, version {{.version}} (branch: {{.branch}}, revision: {{.revision}})
|
||||||
build user: {{.buildUser}}
|
build user: {{.buildUser}}
|
||||||
|
@ -199,7 +213,7 @@ func VersionCmd(t cli.Term, _ ...string) int {
|
||||||
if err := tmpl.ExecuteTemplate(&buf, "version", version.Map); err != nil {
|
if err := tmpl.ExecuteTemplate(&buf, "version", version.Map); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
t.Out(strings.TrimSpace(buf.String()))
|
fmt.Fprintln(t.Out(), strings.TrimSpace(buf.String()))
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,6 +230,11 @@ func main() {
|
||||||
Run: CheckRulesCmd,
|
Run: CheckRulesCmd,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
app.Register("dump-heads", &cli.Command{
|
||||||
|
Desc: "dump metadata of a heads.db checkpoint file",
|
||||||
|
Run: DumpHeadsCmd,
|
||||||
|
})
|
||||||
|
|
||||||
app.Register("version", &cli.Command{
|
app.Register("version", &cli.Command{
|
||||||
Desc: "print the version of this binary",
|
Desc: "print the version of this binary",
|
||||||
Run: VersionCmd,
|
Run: VersionCmd,
|
||||||
|
|
242
storage/local/heads.go
Normal file
242
storage/local/heads.go
Normal file
|
@ -0,0 +1,242 @@
|
||||||
|
// Copyright 2016 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package local
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/storage/local/codable"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
headsFileName = "heads.db"
|
||||||
|
headsTempFileName = "heads.db.tmp"
|
||||||
|
headsFormatVersion = 2
|
||||||
|
headsFormatLegacyVersion = 1 // Can read, but will never write.
|
||||||
|
headsMagicString = "PrometheusHeads"
|
||||||
|
)
|
||||||
|
|
||||||
|
// headsScanner is a scanner to read time series with their heads from a
|
||||||
|
// heads.db file. It follows a similar semantics as the bufio.Scanner.
|
||||||
|
// It is not safe to use a headsScanner concurrently.
|
||||||
|
type headsScanner struct {
|
||||||
|
f *os.File
|
||||||
|
r *bufio.Reader
|
||||||
|
fp model.Fingerprint // Read after each scan() call that has returned true.
|
||||||
|
series *memorySeries // Read after each scan() call that has returned true.
|
||||||
|
version int64 // Read after newHeadsScanner has returned.
|
||||||
|
seriesTotal uint64 // Read after newHeadsScanner has returned.
|
||||||
|
seriesCurrent uint64
|
||||||
|
chunksToPersistTotal int64 // Read after scan() has returned false.
|
||||||
|
err error // Read after scan() has returned false.
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHeadsScanner(filename string) *headsScanner {
|
||||||
|
hs := &headsScanner{}
|
||||||
|
defer func() {
|
||||||
|
if hs.f != nil && hs.err != nil {
|
||||||
|
hs.f.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if hs.f, hs.err = os.Open(filename); hs.err != nil {
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
hs.r = bufio.NewReaderSize(hs.f, fileBufSize)
|
||||||
|
|
||||||
|
buf := make([]byte, len(headsMagicString))
|
||||||
|
if _, hs.err = io.ReadFull(hs.r, buf); hs.err != nil {
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
magic := string(buf)
|
||||||
|
if magic != headsMagicString {
|
||||||
|
hs.err = fmt.Errorf(
|
||||||
|
"unexpected magic string, want %q, got %q",
|
||||||
|
headsMagicString, magic,
|
||||||
|
)
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
hs.version, hs.err = binary.ReadVarint(hs.r)
|
||||||
|
if (hs.version != headsFormatVersion && hs.version != headsFormatLegacyVersion) || hs.err != nil {
|
||||||
|
hs.err = fmt.Errorf(
|
||||||
|
"unknown or unreadable heads format version, want %d, got %d, error: %s",
|
||||||
|
headsFormatVersion, hs.version, hs.err,
|
||||||
|
)
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
if hs.seriesTotal, hs.err = codable.DecodeUint64(hs.r); hs.err != nil {
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan works like bufio.Scanner.Scan.
|
||||||
|
func (hs *headsScanner) scan() bool {
|
||||||
|
if hs.seriesCurrent == hs.seriesTotal || hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
seriesFlags byte
|
||||||
|
fpAsInt uint64
|
||||||
|
metric codable.Metric
|
||||||
|
persistWatermark int64
|
||||||
|
modTimeNano int64
|
||||||
|
modTime time.Time
|
||||||
|
chunkDescsOffset int64
|
||||||
|
savedFirstTime int64
|
||||||
|
numChunkDescs int64
|
||||||
|
firstTime int64
|
||||||
|
lastTime int64
|
||||||
|
encoding byte
|
||||||
|
)
|
||||||
|
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
||||||
|
if fpAsInt, hs.err = codable.DecodeUint64(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
hs.fp = model.Fingerprint(fpAsInt)
|
||||||
|
|
||||||
|
if hs.err = metric.UnmarshalFromReader(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if hs.version != headsFormatLegacyVersion {
|
||||||
|
// persistWatermark only present in v2.
|
||||||
|
persistWatermark, hs.err = binary.ReadVarint(hs.r)
|
||||||
|
if hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
modTimeNano, hs.err = binary.ReadVarint(hs.r)
|
||||||
|
if hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if modTimeNano != -1 {
|
||||||
|
modTime = time.Unix(0, modTimeNano)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if chunkDescsOffset, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if savedFirstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
||||||
|
if hs.version == headsFormatLegacyVersion {
|
||||||
|
if headChunkPersisted {
|
||||||
|
persistWatermark = numChunkDescs
|
||||||
|
} else {
|
||||||
|
persistWatermark = numChunkDescs - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
headChunkClosed := true // Initial assumption.
|
||||||
|
for i := int64(0); i < numChunkDescs; i++ {
|
||||||
|
if i < persistWatermark {
|
||||||
|
if firstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
chunkDescs[i] = &chunkDesc{
|
||||||
|
chunkFirstTime: model.Time(firstTime),
|
||||||
|
chunkLastTime: model.Time(lastTime),
|
||||||
|
}
|
||||||
|
numMemChunkDescs.Inc()
|
||||||
|
} else {
|
||||||
|
// Non-persisted chunk.
|
||||||
|
// If there are non-persisted chunks at all, we consider
|
||||||
|
// the head chunk not to be closed yet.
|
||||||
|
headChunkClosed = false
|
||||||
|
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
||||||
|
if hs.err = chunk.unmarshal(hs.r); hs.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
cd := newChunkDesc(chunk, chunk.firstTime())
|
||||||
|
if i < numChunkDescs-1 {
|
||||||
|
// This is NOT the head chunk. So it's a chunk
|
||||||
|
// to be persisted, and we need to populate lastTime.
|
||||||
|
hs.chunksToPersistTotal++
|
||||||
|
cd.maybePopulateLastTime()
|
||||||
|
}
|
||||||
|
chunkDescs[i] = cd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hs.series = &memorySeries{
|
||||||
|
metric: model.Metric(metric),
|
||||||
|
chunkDescs: chunkDescs,
|
||||||
|
persistWatermark: int(persistWatermark),
|
||||||
|
modTime: modTime,
|
||||||
|
chunkDescsOffset: int(chunkDescsOffset),
|
||||||
|
savedFirstTime: model.Time(savedFirstTime),
|
||||||
|
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
|
||||||
|
headChunkClosed: headChunkClosed,
|
||||||
|
}
|
||||||
|
hs.seriesCurrent++
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// close closes the underlying file if required.
|
||||||
|
func (hs *headsScanner) close() {
|
||||||
|
if hs.f != nil {
|
||||||
|
hs.f.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DumpHeads writes the metadata of the provided heads file in a human-readable
|
||||||
|
// form.
|
||||||
|
func DumpHeads(filename string, out io.Writer) error {
|
||||||
|
hs := newHeadsScanner(filename)
|
||||||
|
defer hs.close()
|
||||||
|
|
||||||
|
if hs.err == nil {
|
||||||
|
fmt.Fprintf(
|
||||||
|
out,
|
||||||
|
">>> Dumping %d series from heads file %q with format version %d. <<<\n",
|
||||||
|
hs.seriesTotal, filename, hs.version,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for hs.scan() {
|
||||||
|
s := hs.series
|
||||||
|
fmt.Fprintf(
|
||||||
|
out,
|
||||||
|
"FP=%v\tMETRIC=%s\tlen(chunkDescs)=%d\tpersistWatermark=%d\tchunkDescOffset=%d\tsavedFirstTime=%v\tlastTime=%v\theadChunkClosed=%t\n",
|
||||||
|
hs.fp, s.metric, len(s.chunkDescs), s.persistWatermark, s.chunkDescsOffset, s.savedFirstTime, s.lastTime, s.headChunkClosed,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if hs.err == nil {
|
||||||
|
fmt.Fprintf(
|
||||||
|
out,
|
||||||
|
">>> Dump complete. %d chunks to persist. <<<\n",
|
||||||
|
hs.chunksToPersistTotal,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return hs.err
|
||||||
|
}
|
|
@ -47,12 +47,6 @@ const (
|
||||||
seriesTempFileSuffix = ".db.tmp"
|
seriesTempFileSuffix = ".db.tmp"
|
||||||
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
|
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
|
||||||
|
|
||||||
headsFileName = "heads.db"
|
|
||||||
headsTempFileName = "heads.db.tmp"
|
|
||||||
headsFormatVersion = 2
|
|
||||||
headsFormatLegacyVersion = 1 // Can read, but will never write.
|
|
||||||
headsMagicString = "PrometheusHeads"
|
|
||||||
|
|
||||||
mappingsFileName = "mappings.db"
|
mappingsFileName = "mappings.db"
|
||||||
mappingsTempFileName = "mappings.db.tmp"
|
mappingsTempFileName = "mappings.db.tmp"
|
||||||
mappingsFormatVersion = 1
|
mappingsFormatVersion = 1
|
||||||
|
@ -699,190 +693,36 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
// start-up while nothing else is running in storage land. This method is
|
// start-up while nothing else is running in storage land. This method is
|
||||||
// utterly goroutine-unsafe.
|
// utterly goroutine-unsafe.
|
||||||
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
|
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
|
||||||
var chunkDescsTotal int64
|
|
||||||
fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
|
fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
|
||||||
sm = &seriesMap{m: fingerprintToSeries}
|
sm = &seriesMap{m: fingerprintToSeries}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if sm != nil && p.dirty {
|
if p.dirty {
|
||||||
log.Warn("Persistence layer appears dirty.")
|
log.Warn("Persistence layer appears dirty.")
|
||||||
err = p.recoverFromCrash(fingerprintToSeries)
|
err = p.recoverFromCrash(fingerprintToSeries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sm = nil
|
sm = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
|
||||||
numMemChunkDescs.Add(float64(chunkDescsTotal))
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
f, err := os.Open(p.headsFileName())
|
hs := newHeadsScanner(p.headsFileName())
|
||||||
if os.IsNotExist(err) {
|
defer hs.close()
|
||||||
|
for hs.scan() {
|
||||||
|
fingerprintToSeries[hs.fp] = hs.series
|
||||||
|
}
|
||||||
|
if os.IsNotExist(hs.err) {
|
||||||
return sm, 0, nil
|
return sm, 0, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if hs.err != nil {
|
||||||
log.Warn("Could not open heads file:", err)
|
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return
|
log.
|
||||||
|
With("file", p.headsFileName()).
|
||||||
|
With("error", hs.err).
|
||||||
|
Error("Error reading heads file.")
|
||||||
|
return sm, 0, hs.err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
return sm, hs.chunksToPersistTotal, nil
|
||||||
r := bufio.NewReaderSize(f, fileBufSize)
|
|
||||||
|
|
||||||
buf := make([]byte, len(headsMagicString))
|
|
||||||
if _, err := io.ReadFull(r, buf); err != nil {
|
|
||||||
log.Warn("Could not read from heads file:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, 0, nil
|
|
||||||
}
|
|
||||||
magic := string(buf)
|
|
||||||
if magic != headsMagicString {
|
|
||||||
log.Warnf(
|
|
||||||
"unexpected magic string, want %q, got %q",
|
|
||||||
headsMagicString, magic,
|
|
||||||
)
|
|
||||||
p.dirty = true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
version, err := binary.ReadVarint(r)
|
|
||||||
if (version != headsFormatVersion && version != headsFormatLegacyVersion) || err != nil {
|
|
||||||
log.Warnf("unknown heads format version, want %d", headsFormatVersion)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, 0, nil
|
|
||||||
}
|
|
||||||
numSeries, err := codable.DecodeUint64(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode number of series:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for ; numSeries > 0; numSeries-- {
|
|
||||||
seriesFlags, err := r.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not read series flags:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
|
||||||
fp, err := codable.DecodeUint64(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode fingerprint:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
var metric codable.Metric
|
|
||||||
if err := metric.UnmarshalFromReader(r); err != nil {
|
|
||||||
log.Warn("Could not decode metric:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
var persistWatermark int64
|
|
||||||
var modTime time.Time
|
|
||||||
if version != headsFormatLegacyVersion {
|
|
||||||
// persistWatermark only present in v2.
|
|
||||||
persistWatermark, err = binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode persist watermark:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
modTimeNano, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode modification time:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
if modTimeNano != -1 {
|
|
||||||
modTime = time.Unix(0, modTimeNano)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
chunkDescsOffset, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode chunk descriptor offset:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
savedFirstTime, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode saved first time:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
numChunkDescs, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode number of chunk descriptors:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
|
||||||
if version == headsFormatLegacyVersion {
|
|
||||||
if headChunkPersisted {
|
|
||||||
persistWatermark = numChunkDescs
|
|
||||||
} else {
|
|
||||||
persistWatermark = numChunkDescs - 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
headChunkClosed := true // Initial assumption.
|
|
||||||
for i := int64(0); i < numChunkDescs; i++ {
|
|
||||||
if i < persistWatermark {
|
|
||||||
firstTime, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode first time:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
lastTime, err := binary.ReadVarint(r)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode last time:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
chunkDescs[i] = &chunkDesc{
|
|
||||||
chunkFirstTime: model.Time(firstTime),
|
|
||||||
chunkLastTime: model.Time(lastTime),
|
|
||||||
}
|
|
||||||
chunkDescsTotal++
|
|
||||||
} else {
|
|
||||||
// Non-persisted chunk.
|
|
||||||
// If there are non-persisted chunks at all, we consider
|
|
||||||
// the head chunk not to be closed yet.
|
|
||||||
headChunkClosed = false
|
|
||||||
encoding, err := r.ReadByte()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Could not decode chunk type:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
|
||||||
if err := chunk.unmarshal(r); err != nil {
|
|
||||||
log.Warn("Could not decode chunk:", err)
|
|
||||||
p.dirty = true
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
|
||||||
cd := newChunkDesc(chunk, chunk.firstTime())
|
|
||||||
if i < numChunkDescs-1 {
|
|
||||||
// This is NOT the head chunk. So it's a chunk
|
|
||||||
// to be persisted, and we need to populate lastTime.
|
|
||||||
chunksToPersist++
|
|
||||||
cd.maybePopulateLastTime()
|
|
||||||
}
|
|
||||||
chunkDescs[i] = cd
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{
|
|
||||||
metric: model.Metric(metric),
|
|
||||||
chunkDescs: chunkDescs,
|
|
||||||
persistWatermark: int(persistWatermark),
|
|
||||||
modTime: modTime,
|
|
||||||
chunkDescsOffset: int(chunkDescsOffset),
|
|
||||||
savedFirstTime: model.Time(savedFirstTime),
|
|
||||||
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(),
|
|
||||||
headChunkClosed: headChunkClosed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sm, chunksToPersist, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
||||||
|
|
|
@ -32,7 +32,8 @@ type Command struct {
|
||||||
type Term interface {
|
type Term interface {
|
||||||
Infof(format string, v ...interface{})
|
Infof(format string, v ...interface{})
|
||||||
Errorf(format string, v ...interface{})
|
Errorf(format string, v ...interface{})
|
||||||
Out(format string)
|
Out() io.Writer
|
||||||
|
Err() io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
type basicTerm struct {
|
type basicTerm struct {
|
||||||
|
@ -52,9 +53,13 @@ func (t *basicTerm) Errorf(format string, v ...interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Out implements Term.
|
// Out implements Term.
|
||||||
func (t *basicTerm) Out(msg string) {
|
func (t *basicTerm) Out() io.Writer {
|
||||||
fmt.Fprint(t.out, msg)
|
return t.out
|
||||||
fmt.Fprint(t.out, "\n")
|
}
|
||||||
|
|
||||||
|
// Err implements Term.
|
||||||
|
func (t *basicTerm) Err() io.Writer {
|
||||||
|
return t.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// BasicTerm returns a Term writing Infof and Errorf to err and Out to out.
|
// BasicTerm returns a Term writing Infof and Errorf to err and Out to out.
|
||||||
|
|
Loading…
Reference in a new issue