mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
commit
bc49a665d1
6
block.go
6
block.go
|
@ -195,7 +195,7 @@ func readMetaFile(dir string) (*BlockMeta, error) {
|
|||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m.Version != 1 && m.Version != 2 {
|
||||
if m.Version != 1 {
|
||||
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
|
||||
}
|
||||
|
||||
|
@ -203,6 +203,8 @@ func readMetaFile(dir string) (*BlockMeta, error) {
|
|||
}
|
||||
|
||||
func writeMetaFile(dir string, meta *BlockMeta) error {
|
||||
meta.Version = 1
|
||||
|
||||
// Make any changes to the file appear atomic.
|
||||
path := filepath.Join(dir, metaFilename)
|
||||
tmp := path + ".tmp"
|
||||
|
@ -253,7 +255,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version)
|
||||
ir, err := index.NewFileReader(filepath.Join(dir, "index"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -23,6 +23,21 @@ import (
|
|||
"github.com/prometheus/tsdb/testutil"
|
||||
)
|
||||
|
||||
// In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
|
||||
// to 2. We had a migration in place resetting it to 1 but we should move immediately to
|
||||
// version 3 next time to avoid confusion and issues.
|
||||
func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "metaversion")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{}))
|
||||
|
||||
meta, err := readMetaFile(dir)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, meta.Version != 2, "meta.json version must never be 2")
|
||||
}
|
||||
|
||||
func TestSetCompactionFailed(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "test-tsdb")
|
||||
testutil.Ok(t, err)
|
||||
|
|
|
@ -431,7 +431,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
if err != nil {
|
||||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
meta.Version = indexw.Version
|
||||
|
||||
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
|
||||
return errors.Wrap(err, "write compaction")
|
||||
|
|
4
db.go
4
db.go
|
@ -188,6 +188,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
if opts == nil {
|
||||
opts = DefaultOptions
|
||||
}
|
||||
// Fixup bad format written by Prometheus 2.1.
|
||||
if err := repairBadIndexVersion(l, dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
|
|
|
@ -37,6 +37,7 @@ const (
|
|||
MagicIndex = 0xBAAAD700
|
||||
|
||||
indexFormatV1 = 1
|
||||
indexFormatV2 = 2
|
||||
)
|
||||
|
||||
type indexWriterSeries struct {
|
||||
|
@ -135,7 +136,7 @@ type indexTOC struct {
|
|||
postingsTable uint64
|
||||
}
|
||||
|
||||
// NewWriter returns a new Writer to the given filename.
|
||||
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
|
||||
func NewWriter(fn string) (*Writer, error) {
|
||||
dir := filepath.Dir(fn)
|
||||
|
||||
|
@ -168,8 +169,6 @@ func NewWriter(fn string) (*Writer, error) {
|
|||
symbols: make(map[string]uint32, 1<<13),
|
||||
seriesOffsets: make(map[uint64]uint64, 1<<16),
|
||||
crc32: newCRC32(),
|
||||
|
||||
Version: 2,
|
||||
}
|
||||
if err := iw.writeMeta(); err != nil {
|
||||
return nil, err
|
||||
|
@ -195,7 +194,7 @@ func (w *Writer) write(bufs ...[]byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// addPadding adds zero byte padding until the file size is a multiple size_unit.
|
||||
// addPadding adds zero byte padding until the file size is a multiple size.
|
||||
func (w *Writer) addPadding(size int) error {
|
||||
p := w.pos % uint64(size)
|
||||
if p == 0 {
|
||||
|
@ -249,7 +248,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|||
func (w *Writer) writeMeta() error {
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32(MagicIndex)
|
||||
w.buf1.putByte(indexFormatV1)
|
||||
w.buf1.putByte(indexFormatV2)
|
||||
|
||||
return w.write(w.buf1.get())
|
||||
}
|
||||
|
@ -266,7 +265,13 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
|||
if _, ok := w.seriesOffsets[ref]; ok {
|
||||
return errors.Errorf("series with reference %d already added", ref)
|
||||
}
|
||||
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
|
||||
// series references.
|
||||
w.addPadding(16)
|
||||
|
||||
if w.pos%16 != 0 {
|
||||
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
|
||||
}
|
||||
w.seriesOffsets[ref] = w.pos / 16
|
||||
|
||||
w.buf2.reset()
|
||||
|
@ -572,25 +577,22 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
|
|||
return b[start:end]
|
||||
}
|
||||
|
||||
// NewReader returns a new IndexReader on the given byte slice.
|
||||
func NewReader(b ByteSlice, version int) (*Reader, error) {
|
||||
return newReader(b, nil, version)
|
||||
// NewReader returns a new IndexReader on the given byte slice. It automatically
|
||||
// handles different format versions.
|
||||
func NewReader(b ByteSlice) (*Reader, error) {
|
||||
return newReader(b, nil)
|
||||
}
|
||||
|
||||
// NewFileReader returns a new index reader against the given index file.
|
||||
func NewFileReader(path string, version int) (*Reader, error) {
|
||||
func NewFileReader(path string) (*Reader, error) {
|
||||
f, err := fileutil.OpenMmapFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newReader(realByteSlice(f.Bytes()), f, version)
|
||||
return newReader(realByteSlice(f.Bytes()), f)
|
||||
}
|
||||
|
||||
func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
|
||||
if version != 1 && version != 2 {
|
||||
return nil, errors.Errorf("unexpected file version %d", version)
|
||||
}
|
||||
|
||||
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
||||
r := &Reader{
|
||||
b: b,
|
||||
c: c,
|
||||
|
@ -598,16 +600,20 @@ func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
|
|||
labels: map[string]uint32{},
|
||||
postings: map[labels.Label]uint32{},
|
||||
crc32: newCRC32(),
|
||||
version: version,
|
||||
}
|
||||
|
||||
// Verify magic number.
|
||||
if b.Len() < 4 {
|
||||
// Verify header.
|
||||
if b.Len() < 5 {
|
||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||
}
|
||||
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
|
||||
return nil, errors.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
r.version = int(r.b.Range(4, 5)[0])
|
||||
|
||||
if r.version != 1 && r.version != 2 {
|
||||
return nil, errors.Errorf("unknown index file version %d", r.version)
|
||||
}
|
||||
|
||||
if err := r.readTOC(); err != nil {
|
||||
return nil, errors.Wrap(err, "read TOC")
|
||||
|
@ -880,8 +886,10 @@ func (r *Reader) LabelIndices() ([][]string, error) {
|
|||
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
|
||||
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
offset := id
|
||||
// In version 2 series IDs are no longer exact references but series are 16-byte padded
|
||||
// and the ID is the multiple of 16 of the actual position.
|
||||
if r.version == 2 {
|
||||
offset = 16 * id
|
||||
offset = id * 16
|
||||
}
|
||||
d := r.decbufUvarintAt(int(offset))
|
||||
if d.err() != nil {
|
||||
|
|
|
@ -160,7 +160,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
ir, err := NewFileReader(fn, 1)
|
||||
ir, err := NewFileReader(fn)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, ir.Close())
|
||||
|
||||
|
@ -170,7 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
|
|||
_, err = f.WriteAt([]byte{0, 0}, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
_, err = NewFileReader(dir, 1)
|
||||
_, err = NewFileReader(dir)
|
||||
testutil.NotOk(t, err)
|
||||
}
|
||||
|
||||
|
@ -213,7 +213,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
ir, err := NewFileReader(fn, 2)
|
||||
ir, err := NewFileReader(fn)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
p, err := ir.Postings("a", "1")
|
||||
|
@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
err = iw.Close()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
ir, err := NewFileReader(filepath.Join(dir, "index"), 2)
|
||||
ir, err := NewFileReader(filepath.Join(dir, "index"))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
for p := range mi.postings {
|
||||
|
|
90
repair.go
Normal file
90
repair.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
// repairBadIndexVersion repairs an issue in index and meta.json persistence introduced in
|
||||
// commit 129773b41a565fde5156301e37f9a87158030443.
|
||||
func repairBadIndexVersion(logger log.Logger, dir string) error {
|
||||
// All blocks written by Prometheus 2.1 with a meta.json version of 2 are affected.
|
||||
// We must actually set the index file version to 2 and revert the meta.json version back to 1.
|
||||
subdirs, err := fileutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, d := range subdirs {
|
||||
// Skip non-block dirs.
|
||||
if _, err := ulid.Parse(d); err != nil {
|
||||
continue
|
||||
}
|
||||
d = path.Join(dir, d)
|
||||
|
||||
meta, err := readBogusMetaFile(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if meta.Version == 1 {
|
||||
continue
|
||||
}
|
||||
level.Info(logger).Log("msg", "fixing broken block", "ulid", meta.ULID)
|
||||
|
||||
repl, err := os.Create(filepath.Join(d, "index.repaired"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
broken, err := os.Open(filepath.Join(d, "index"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.Copy(repl, broken); err != nil {
|
||||
return err
|
||||
}
|
||||
// Set the 5th byte to 2 to indiciate the correct file format version.
|
||||
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fileutil.Fsync(repl); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := repl.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := renameFile(repl.Name(), broken.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
// Reset version of meta.json to 1.
|
||||
meta.Version = 1
|
||||
if err := writeMetaFile(d, meta); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readBogusMetaFile(dir string) (*BlockMeta, error) {
|
||||
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m BlockMeta
|
||||
|
||||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m.Version != 1 && m.Version != 2 {
|
||||
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
|
||||
}
|
||||
return &m, nil
|
||||
}
|
122
repair_test.go
Normal file
122
repair_test.go
Normal file
|
@ -0,0 +1,122 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
|
||||
"github.com/prometheus/tsdb/index"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
func TestRepairBadIndexVersion(t *testing.T) {
|
||||
// The broken index used in this test was written by the following script
|
||||
// at a broken revision.
|
||||
//
|
||||
// func main() {
|
||||
// w, err := index.NewWriter("index")
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// err = w.AddSymbols(map[string]struct{}{
|
||||
// "a": struct{}{},
|
||||
// "b": struct{}{},
|
||||
// "1": struct{}{},
|
||||
// "2": struct{}{},
|
||||
// })
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// err = w.AddSeries(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// err = w.AddSeries(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// err = w.WritePostings("b", "1", index.NewListPostings([]uint64{1, 2}))
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// if err := w.Close(); err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// In its current state, lookups should fail with the fixed code.
|
||||
const dir = "testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/"
|
||||
meta, err := readMetaFile(dir)
|
||||
if err == nil {
|
||||
t.Fatal("error expected but got none")
|
||||
}
|
||||
// Touch chunks dir in block.
|
||||
os.MkdirAll(dir+"chunks", 0777)
|
||||
|
||||
r, err := index.NewFileReader(dir + "index")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
p, err := r.Postings("b", "1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for p.Next() {
|
||||
t.Logf("next ID %d", p.At())
|
||||
|
||||
var lset labels.Labels
|
||||
if err := r.Series(p.At(), &lset, nil); err == nil {
|
||||
t.Fatal("expected error but got none")
|
||||
}
|
||||
}
|
||||
if p.Err() != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// On DB opening all blocks in the base dir should be repaired.
|
||||
db, _ := Open("testdata/repair_index_version", nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db.Close()
|
||||
|
||||
r, err = index.NewFileReader(dir + "index")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
p, err = r.Postings("b", "1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
res := []labels.Labels{}
|
||||
|
||||
for p.Next() {
|
||||
t.Logf("next ID %d", p.At())
|
||||
|
||||
var lset labels.Labels
|
||||
var chks []chunks.Meta
|
||||
if err := r.Series(p.At(), &lset, &chks); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
res = append(res, lset)
|
||||
}
|
||||
if p.Err() != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(res, []labels.Labels{
|
||||
{{"a", "1"}, {"b", "1"}},
|
||||
{{"a", "2"}, {"b", "1"}},
|
||||
}) {
|
||||
t.Fatalf("unexpected result %v", res)
|
||||
}
|
||||
|
||||
meta, err = readMetaFile(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if meta.Version != 1 {
|
||||
t.Fatalf("unexpected meta version %d", meta.Version)
|
||||
}
|
||||
}
|
BIN
testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/index
vendored
Normal file
BIN
testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/index
vendored
Normal file
Binary file not shown.
17
testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json
vendored
Normal file
17
testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/meta.json
vendored
Normal file
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"version": 2,
|
||||
"ulid": "01BZJ9WJR6Z192734YNMD62F6M",
|
||||
"minTime": 1511366400000,
|
||||
"maxTime": 1511368200000,
|
||||
"stats": {
|
||||
"numSamples": 31897565,
|
||||
"numSeries": 88910,
|
||||
"numChunks": 266093
|
||||
},
|
||||
"compaction": {
|
||||
"level": 1,
|
||||
"sources": [
|
||||
"01BZJ9WJR6Z192734YNMD62F6M"
|
||||
]
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue