Merge pull request #248 from shubheksha/fix/238

Align series to 8/16 byte padding to increase addressable space
This commit is contained in:
Goutham Veeramachaneni 2018-01-13 12:25:28 +05:30 committed by GitHub
commit 8d373c763b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 54 additions and 37 deletions

View file

@ -151,6 +151,9 @@ type BlockMeta struct {
// Information on compactions the block was created from. // Information on compactions the block was created from.
Compaction BlockMetaCompaction `json:"compaction"` Compaction BlockMetaCompaction `json:"compaction"`
// Version of the index format.
Version int `json:"version"`
} }
// BlockStats contains stats about contents of a block. // BlockStats contains stats about contents of a block.
@ -176,12 +179,6 @@ const (
flagStd = 1 flagStd = 1
) )
type blockMeta struct {
Version int `json:"version"`
*BlockMeta
}
const indexFilename = "index" const indexFilename = "index"
const metaFilename = "meta.json" const metaFilename = "meta.json"
@ -193,16 +190,16 @@ func readMetaFile(dir string) (*BlockMeta, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var m blockMeta var m BlockMeta
if err := json.Unmarshal(b, &m); err != nil { if err := json.Unmarshal(b, &m); err != nil {
return nil, err return nil, err
} }
if m.Version != 1 { if m.Version != 1 && m.Version != 2 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version) return nil, errors.Errorf("unexpected meta file version %d", m.Version)
} }
return m.BlockMeta, nil return &m, nil
} }
func writeMetaFile(dir string, meta *BlockMeta) error { func writeMetaFile(dir string, meta *BlockMeta) error {
@ -219,7 +216,8 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
var merr MultiError var merr MultiError
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
merr.Add(f.Close()) merr.Add(f.Close())
return merr.Err() return merr.Err()
} }
@ -255,7 +253,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := index.NewFileReader(filepath.Join(dir, "index")) ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -42,7 +42,7 @@ func TestSetCompactionFailed(t *testing.T) {
func createEmptyBlock(t *testing.T, dir string) *Block { func createEmptyBlock(t *testing.T, dir string) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777)) testutil.Ok(t, os.MkdirAll(dir, 0777))
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2}))
ir, err := index.NewWriter(filepath.Join(dir, indexFilename)) ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -428,6 +428,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
meta.Version = indexw.Version
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }

View file

@ -65,7 +65,7 @@ Strings are referenced by pointing to the beginning of their length field. The s
### Series ### Series
The section contains a sequence of series that hold the label set of the series as well as its chunks within the block. The series are sorted lexicographically by their label sets. The section contains a sequence of series that hold the label set of the series as well as its chunks within the block. The series are sorted lexicographically by their label sets.
The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. Each series section is aligned to 16 bytes. The ID for a series is the `offset/16`. This serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets.
``` ```
┌───────────────────────────────────────┐ ┌───────────────────────────────────────┐

View file

@ -98,7 +98,7 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable) return crc32.New(castagnoliTable)
} }
// indexWriter implements the IndexWriter interface for the standard // Writer implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type Writer struct { type Writer struct {
f *os.File f *os.File
@ -122,6 +122,8 @@ type Writer struct {
lastSeries labels.Labels lastSeries labels.Labels
crc32 hash.Hash crc32 hash.Hash
Version int
} }
type indexTOC struct { type indexTOC struct {
@ -166,6 +168,8 @@ func NewWriter(fn string) (*Writer, error) {
symbols: make(map[string]uint32, 1<<13), symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint64]uint64, 1<<16), seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(), crc32: newCRC32(),
Version: 2,
} }
if err := iw.writeMeta(); err != nil { if err := iw.writeMeta(); err != nil {
return nil, err return nil, err
@ -180,12 +184,12 @@ func (w *Writer) write(bufs ...[]byte) error {
if err != nil { if err != nil {
return err return err
} }
// For now the index file must not grow beyond 4GiB. Some of the fixed-sized // For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large. // offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation // Once we move to compressed/varint representations in those areas, this limitation
// can be lifted. // can be lifted.
if w.pos > math.MaxUint32 { if w.pos > 16*math.MaxUint32 {
return errors.Errorf("exceeding max size of 4GiB") return errors.Errorf("exceeding max size of 64GiB")
} }
} }
return nil return nil
@ -250,6 +254,7 @@ func (w *Writer) writeMeta() error {
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
// AddSeries adds the series one at a time along with its chunks.
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
if err := w.ensureStage(idxStageSeries); err != nil { if err := w.ensureStage(idxStageSeries); err != nil {
return err return err
@ -261,7 +266,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if _, ok := w.seriesOffsets[ref]; ok { if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref) return errors.Errorf("series with reference %d already added", ref)
} }
w.seriesOffsets[ref] = w.pos w.addPadding(16)
w.seriesOffsets[ref] = w.pos / 16
w.buf2.reset() w.buf2.reset()
w.buf2.putUvarint(len(lset)) w.buf2.putUvarint(len(lset))
@ -531,9 +537,11 @@ type Reader struct {
// the block has been unmapped. // the block has been unmapped.
symbols map[uint32]string symbols map[uint32]string
dec *DecoderV1 dec *Decoder
crc32 hash.Hash32 crc32 hash.Hash32
version int
} }
var ( var (
@ -563,20 +571,20 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
} }
// NewReader returns a new IndexReader on the given byte slice. // NewReader returns a new IndexReader on the given byte slice.
func NewReader(b ByteSlice) (*Reader, error) { func NewReader(b ByteSlice, version int) (*Reader, error) {
return newReader(b, nil) return newReader(b, nil, version)
} }
// NewFileReader returns a new index reader against the given index file. // NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) { func NewFileReader(path string, version int) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path) f, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newReader(realByteSlice(f.Bytes()), f) return newReader(realByteSlice(f.Bytes()), f, version)
} }
func newReader(b ByteSlice, c io.Closer) (*Reader, error) { func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
r := &Reader{ r := &Reader{
b: b, b: b,
c: c, c: c,
@ -584,6 +592,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
labels: map[string]uint32{}, labels: map[string]uint32{},
postings: map[labels.Label]uint32{}, postings: map[labels.Label]uint32{},
crc32: newCRC32(), crc32: newCRC32(),
version: version,
}
if version != 1 && version != 2 {
return nil, errors.Errorf("unexpected file version %d", version)
} }
// Verify magic number. // Verify magic number.
if b.Len() < 4 { if b.Len() < 4 {
@ -622,7 +636,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return nil, errors.Wrap(err, "read postings table") return nil, errors.Wrap(err, "read postings table")
} }
r.dec = &DecoderV1{symbols: r.symbols} r.dec = &Decoder{symbols: r.symbols}
return r, nil return r, nil
} }
@ -852,9 +866,13 @@ func (r *Reader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
// Series the series with the given ID and writes its labels and chunks into lbls and chks. // Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(id)) offset := id
if r.version == 2 {
offset = 16 * id
}
d := r.decbufUvarintAt(int(offset))
if d.err() != nil { if d.err() != nil {
return d.err() return d.err()
} }
@ -955,15 +973,15 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
return res, nil return res, nil
} }
// DecoderV1 provides decoding methods for the v1 index file format. // Decoder provides decoding methods for the v1 and v2 index file format.
// //
// It currently does not contain decoding methods for all entry types but can be extended // It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand. // by them if there's demand.
type DecoderV1 struct { type Decoder struct {
symbols map[uint32]string symbols map[uint32]string
} }
func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) { func (dec *Decoder) lookupSymbol(o uint32) (string, error) {
s, ok := dec.symbols[o] s, ok := dec.symbols[o]
if !ok { if !ok {
return "", errors.Errorf("unknown symbol offset %d", o) return "", errors.Errorf("unknown symbol offset %d", o)
@ -973,12 +991,12 @@ func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) {
// SetSymbolTable set the symbol table to be used for lookups when decoding series // SetSymbolTable set the symbol table to be used for lookups when decoding series
// and label indices // and label indices
func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) { func (dec *Decoder) SetSymbolTable(t map[uint32]string) {
dec.symbols = t dec.symbols = t
} }
// Postings returns a postings list for b and its number of elements. // Postings returns a postings list for b and its number of elements.
func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) { func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := decbuf{b: b} d := decbuf{b: b}
n := d.be32int() n := d.be32int()
l := d.get() l := d.get()
@ -986,7 +1004,7 @@ func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) {
} }
// Series decodes a series entry from the given byte slice into lset and chks. // Series decodes a series entry from the given byte slice into lset and chks.
func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
*chks = (*chks)[:0] *chks = (*chks)[:0]

View file

@ -160,7 +160,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, iw.Close()) testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, ir.Close()) testutil.Ok(t, ir.Close())
@ -170,7 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0, 0}, 0) _, err = f.WriteAt([]byte{0, 0}, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = NewFileReader(dir) _, err = NewFileReader(dir, 1)
testutil.NotOk(t, err) testutil.NotOk(t, err)
} }
@ -213,7 +213,7 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.Close()) testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn, 2)
testutil.Ok(t, err) testutil.Ok(t, err)
p, err := ir.Postings("a", "1") p, err := ir.Postings("a", "1")
@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close() err = iw.Close()
testutil.Ok(t, err) testutil.Ok(t, err)
ir, err := NewFileReader(filepath.Join(dir, "index")) ir, err := NewFileReader(filepath.Join(dir, "index"), 2)
testutil.Ok(t, err) testutil.Ok(t, err)
for p := range mi.postings { for p := range mi.postings {