// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package local import ( "bufio" "encoding/binary" "fmt" "io" "os" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/local/codable" ) const ( headsFileName = "heads.db" headsTempFileName = "heads.db.tmp" headsFormatVersion = 2 headsFormatLegacyVersion = 1 // Can read, but will never write. headsMagicString = "PrometheusHeads" ) // headsScanner is a scanner to read time series with their heads from a // heads.db file. It follows a similar semantics as the bufio.Scanner. // It is not safe to use a headsScanner concurrently. type headsScanner struct { f *os.File r *bufio.Reader fp model.Fingerprint // Read after each scan() call that has returned true. series *memorySeries // Read after each scan() call that has returned true. version int64 // Read after newHeadsScanner has returned. seriesTotal uint64 // Read after newHeadsScanner has returned. seriesCurrent uint64 chunksToPersistTotal int64 // Read after scan() has returned false. err error // Read after scan() has returned false. } func newHeadsScanner(filename string) *headsScanner { hs := &headsScanner{} defer func() { if hs.f != nil && hs.err != nil { hs.f.Close() } }() if hs.f, hs.err = os.Open(filename); hs.err != nil { return hs } hs.r = bufio.NewReaderSize(hs.f, fileBufSize) buf := make([]byte, len(headsMagicString)) if _, hs.err = io.ReadFull(hs.r, buf); hs.err != nil { return hs } magic := string(buf) if magic != headsMagicString { hs.err = fmt.Errorf( "unexpected magic string, want %q, got %q", headsMagicString, magic, ) return hs } hs.version, hs.err = binary.ReadVarint(hs.r) if (hs.version != headsFormatVersion && hs.version != headsFormatLegacyVersion) || hs.err != nil { hs.err = fmt.Errorf( "unknown or unreadable heads format version, want %d, got %d, error: %s", headsFormatVersion, hs.version, hs.err, ) return hs } if hs.seriesTotal, hs.err = codable.DecodeUint64(hs.r); hs.err != nil { return hs } return hs } // scan works like bufio.Scanner.Scan. func (hs *headsScanner) scan() bool { if hs.seriesCurrent == hs.seriesTotal || hs.err != nil { return false } var ( seriesFlags byte fpAsInt uint64 metric codable.Metric persistWatermark int64 modTimeNano int64 modTime time.Time chunkDescsOffset int64 savedFirstTime int64 numChunkDescs int64 firstTime int64 lastTime int64 encoding byte ) if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { return false } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 if fpAsInt, hs.err = codable.DecodeUint64(hs.r); hs.err != nil { return false } hs.fp = model.Fingerprint(fpAsInt) if hs.err = metric.UnmarshalFromReader(hs.r); hs.err != nil { return false } if hs.version != headsFormatLegacyVersion { // persistWatermark only present in v2. persistWatermark, hs.err = binary.ReadVarint(hs.r) if hs.err != nil { return false } modTimeNano, hs.err = binary.ReadVarint(hs.r) if hs.err != nil { return false } if modTimeNano != -1 { modTime = time.Unix(0, modTimeNano) } } if chunkDescsOffset, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } if savedFirstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } chunkDescs := make([]*chunkDesc, numChunkDescs) if hs.version == headsFormatLegacyVersion { if headChunkPersisted { persistWatermark = numChunkDescs } else { persistWatermark = numChunkDescs - 1 } } headChunkClosed := true // Initial assumption. for i := int64(0); i < numChunkDescs; i++ { if i < persistWatermark { if firstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } chunkDescs[i] = &chunkDesc{ chunkFirstTime: model.Time(firstTime), chunkLastTime: model.Time(lastTime), } numMemChunkDescs.Inc() } else { // Non-persisted chunk. // If there are non-persisted chunks at all, we consider // the head chunk not to be closed yet. headChunkClosed = false if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { return false } chunk := newChunkForEncoding(chunkEncoding(encoding)) if hs.err = chunk.unmarshal(hs.r); hs.err != nil { return false } cd := newChunkDesc(chunk, chunk.firstTime()) if i < numChunkDescs-1 { // This is NOT the head chunk. So it's a chunk // to be persisted, and we need to populate lastTime. hs.chunksToPersistTotal++ cd.maybePopulateLastTime() } chunkDescs[i] = cd } } hs.series = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, persistWatermark: int(persistWatermark), modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: model.Time(savedFirstTime), lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), headChunkClosed: headChunkClosed, } hs.seriesCurrent++ return true } // close closes the underlying file if required. func (hs *headsScanner) close() { if hs.f != nil { hs.f.Close() } } // DumpHeads writes the metadata of the provided heads file in a human-readable // form. func DumpHeads(filename string, out io.Writer) error { hs := newHeadsScanner(filename) defer hs.close() if hs.err == nil { fmt.Fprintf( out, ">>> Dumping %d series from heads file %q with format version %d. <<<\n", hs.seriesTotal, filename, hs.version, ) } for hs.scan() { s := hs.series fmt.Fprintf( out, "FP=%v\tMETRIC=%s\tlen(chunkDescs)=%d\tpersistWatermark=%d\tchunkDescOffset=%d\tsavedFirstTime=%v\tlastTime=%v\theadChunkClosed=%t\n", hs.fp, s.metric, len(s.chunkDescs), s.persistWatermark, s.chunkDescsOffset, s.savedFirstTime, s.lastTime, s.headChunkClosed, ) } if hs.err == nil { fmt.Fprintf( out, ">>> Dump complete. %d chunks to persist. <<<\n", hs.chunksToPersistTotal, ) } return hs.err }