Tweak IndexWriter interface, serialize directly into dir

This commit is contained in:
Fabian Reinartz 2017-02-25 07:24:20 +01:00
parent 78780cd2ba
commit 9a5dfadb09
4 changed files with 78 additions and 81 deletions

View file

@ -64,8 +64,6 @@ type persistedBlock struct {
dir string dir string
meta BlockMeta meta BlockMeta
indexf *mmapFile
chunkr *chunkReader chunkr *chunkReader
indexr *indexReader indexr *indexReader
} }
@ -124,24 +122,14 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// ir, err := newIndexReader(dir) ir, err := newIndexReader(dir)
// if err != nil {
// return nil, err
// }
indexf, err := openMmapFile(indexFileName(dir))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open index file") return nil, err
}
ir, err := newIndexReader(indexf.b)
if err != nil {
return nil, errors.Wrap(err, "create index reader")
} }
pb := &persistedBlock{ pb := &persistedBlock{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
indexf: indexf,
chunkr: cr, chunkr: cr,
indexr: ir, indexr: ir,
} }
@ -150,7 +138,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
func (pb *persistedBlock) Close() error { func (pb *persistedBlock) Close() error {
err0 := pb.chunkr.Close() err0 := pb.chunkr.Close()
err1 := pb.indexf.Close() err1 := pb.indexr.Close()
if err0 != nil { if err0 != nil {
return err0 return err0

View file

@ -163,16 +163,14 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return err return err
} }
indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return errors.Wrap(err, "create index file")
}
indexw := newIndexWriter(indexf)
chunkw, err := newChunkWriter(filepath.Join(dir, "chunks")) chunkw, err := newChunkWriter(filepath.Join(dir, "chunks"))
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }
indexw, err := newIndexWriter(dir)
if err != nil {
return errors.Wrap(err, "open index writer")
}
if err = c.write(dir, blocks, indexw, chunkw); err != nil { if err = c.write(dir, blocks, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
@ -184,12 +182,6 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
if err = indexw.Close(); err != nil { if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer") return errors.Wrap(err, "close index writer")
} }
if err = fileutil.Fsync(indexf); err != nil {
return errors.Wrap(err, "fsync index file")
}
if err = indexf.Close(); err != nil {
return errors.Wrap(err, "close index file")
}
return nil return nil
} }

View file

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
"path/filepath"
"strings" "strings"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
@ -26,6 +27,7 @@ type chunkReader struct {
// The underlying bytes holding the encoded series data. // The underlying bytes holding the encoded series data.
bs [][]byte bs [][]byte
// Closers for resources behind the byte slices.
cs []io.Closer cs []io.Closer
} }
@ -104,6 +106,9 @@ type IndexReader interface {
// LabelIndices returns the label pairs for which indices exist. // LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error) LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader.
Close() error
} }
// StringTuples provides access to a sorted list of string tuples. // StringTuples provides access to a sorted list of string tuples.
@ -118,6 +123,9 @@ type indexReader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b []byte b []byte
// Close that releases the underlying resources of the byte slice.
c io.Closer
// Cached hashmaps of section offsets. // Cached hashmaps of section offsets.
labels map[string]uint32 labels map[string]uint32
postings map[string]uint32 postings map[string]uint32
@ -128,34 +136,38 @@ var (
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")
) )
func newIndexReader(b []byte) (*indexReader, error) { // newIndexReader returns a new indexReader on the given directory.
if len(b) < 4 { func newIndexReader(dir string) (*indexReader, error) {
return nil, errors.Wrap(errInvalidSize, "index header") f, err := openMmapFile(filepath.Join(dir, "index"))
if err != nil {
return nil, err
} }
r := &indexReader{b: b} r := &indexReader{b: f.b, c: f}
// Verify magic number. // Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { if len(f.b) < 4 {
return nil, fmt.Errorf("invalid magic number %x", m) return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m)
} }
var err error
// The last two 4 bytes hold the pointers to the hashmaps. // The last two 4 bytes hold the pointers to the hashmaps.
loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
poff := binary.BigEndian.Uint32(b[len(b)-4:]) poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
f, b, err := r.section(loff) flag, b, err := r.section(loff)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
} }
if r.labels, err = readHashmap(f, b); err != nil { if r.labels, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read label index hashmap") return nil, errors.Wrap(err, "read label index hashmap")
} }
f, b, err = r.section(poff) flag, b, err = r.section(poff)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
} }
if r.postings, err = readHashmap(f, b); err != nil { if r.postings, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read postings hashmap") return nil, errors.Wrap(err, "read postings hashmap")
} }
@ -193,6 +205,10 @@ func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
return h, nil return h, nil
} }
func (r *indexReader) Close() error {
return r.c.Close()
}
func (r *indexReader) section(o uint32) (byte, []byte, error) { func (r *indexReader) section(o uint32) (byte, []byte, error) {
b := r.b[o:] b := r.b[o:]

View file

@ -7,6 +7,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"os" "os"
"path/filepath"
"sort" "sort"
"strings" "strings"
@ -35,9 +36,6 @@ type ChunkWriter interface {
// is set and can be used to retrieve the chunks from the written data. // is set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...ChunkMeta) error WriteChunks(chunks ...ChunkMeta) error
// Size returns the size of the data written so far.
Size() int64
// Close writes any required finalization and closes the resources // Close writes any required finalization and closes the resources
// associated with the underlying writer. // associated with the underlying writer.
Close() error Close() error
@ -214,10 +212,6 @@ func (w *chunkWriter) seq() int {
return len(w.files) - 1 return len(w.files) - 1
} }
func (w *chunkWriter) Size() int64 {
return w.n
}
func (w *chunkWriter) Close() error { func (w *chunkWriter) Close() error {
return w.finalizeTail() return w.finalizeTail()
} }
@ -240,7 +234,7 @@ type IndexWriter interface {
// of chunks that the index can reference. // of chunks that the index can reference.
// The reference number is used to resolve a series against the postings // The reference number is used to resolve a series against the postings
// list iterator. It only has to be available during the write processing. // list iterator. It only has to be available during the write processing.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values. // WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names. // The passed in values chained tuples of strings of the length of names.
@ -249,9 +243,6 @@ type IndexWriter interface {
// WritePostings writes a postings list for a single label pair. // WritePostings writes a postings list for a single label pair.
WritePostings(name, value string, it Postings) error WritePostings(name, value string, it Postings) error
// Size returns the size of the data written so far.
Size() int64
// Close writes any finalization and closes theresources associated with // Close writes any finalization and closes theresources associated with
// the underlying writer. // the underlying writer.
Close() error Close() error
@ -266,13 +257,12 @@ type indexWriterSeries struct {
// indexWriter implements the IndexWriter interface for the standard // indexWriter implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type indexWriter struct { type indexWriter struct {
ow io.Writer f *os.File
w *bufio.Writer bufw *bufio.Writer
n int64 n int64
started bool started bool
series map[uint32]*indexWriterSeries series map[uint32]*indexWriterSeries
symbols map[string]uint32 // symbol offsets symbols map[string]uint32 // symbol offsets
labelIndexes []hashEntry // label index offsets labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets postings []hashEntry // postings lists offsets
@ -280,15 +270,31 @@ type indexWriter struct {
crc32 hash.Hash crc32 hash.Hash
} }
func newIndexWriter(w io.Writer) *indexWriter { func newIndexWriter(dir string) (*indexWriter, error) {
return &indexWriter{ df, err := fileutil.OpenDir(dir)
w: bufio.NewWriterSize(w, 1*1024*1024), if err != nil {
ow: w, return nil, err
}
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, err
}
if err := fileutil.Fsync(df); err != nil {
return nil, errors.Wrap(err, "sync dir")
}
iw := &indexWriter{
f: f,
bufw: bufio.NewWriterSize(f, 1*1024*1024),
n: 0, n: 0,
symbols: make(map[string]uint32, 4096), symbols: make(map[string]uint32, 4096),
series: make(map[uint32]*indexWriterSeries, 4096), series: make(map[uint32]*indexWriterSeries, 4096),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
} }
if err := iw.writeMeta(); err != nil {
return nil, err
}
return iw, nil
} }
func (w *indexWriter) write(wr io.Writer, b []byte) error { func (w *indexWriter) write(wr io.Writer, b []byte) error {
@ -300,7 +306,7 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error {
// section writes a CRC32 checksummed section of length l and guarded by flag. // section writes a CRC32 checksummed section of length l and guarded by flag.
func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error {
w.crc32.Reset() w.crc32.Reset()
wr := io.MultiWriter(w.crc32, w.w) wr := io.MultiWriter(w.crc32, w.bufw)
b := [5]byte{flag, 0, 0, 0, 0} b := [5]byte{flag, 0, 0, 0, 0}
binary.BigEndian.PutUint32(b[1:], l) binary.BigEndian.PutUint32(b[1:], l)
@ -310,9 +316,9 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
} }
if err := f(wr); err != nil { if err := f(wr); err != nil {
return errors.Wrap(err, "contents write func") return errors.Wrap(err, "write contents")
} }
if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
return errors.Wrap(err, "writing checksum") return errors.Wrap(err, "writing checksum")
} }
return nil return nil
@ -324,10 +330,13 @@ func (w *indexWriter) writeMeta() error {
binary.BigEndian.PutUint32(b[:4], MagicIndex) binary.BigEndian.PutUint32(b[:4], MagicIndex)
b[4] = flagStd b[4] = flagStd
return w.write(w.w, b[:]) return w.write(w.bufw, b[:])
} }
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) { func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error {
if _, ok := w.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
// Populate the symbol table from all label sets we have to reference. // Populate the symbol table from all label sets we have to reference.
for _, l := range lset { for _, l := range lset {
w.symbols[l.Name] = 0 w.symbols[l.Name] = 0
@ -338,6 +347,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM
labels: lset, labels: lset,
chunks: chunks, chunks: chunks,
} }
return nil
} }
func (w *indexWriter) writeSymbols() error { func (w *indexWriter) writeSymbols() error {
@ -425,9 +435,6 @@ func (w *indexWriter) writeSeries() error {
} }
func (w *indexWriter) init() error { func (w *indexWriter) init() error {
if err := w.writeMeta(); err != nil {
return err
}
if err := w.writeSymbols(); err != nil { if err := w.writeSymbols(); err != nil {
return err return err
} }
@ -524,10 +531,6 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
}) })
} }
func (w *indexWriter) Size() int64 {
return w.n
}
type hashEntry struct { type hashEntry struct {
name string name string
offset uint32 offset uint32
@ -567,24 +570,22 @@ func (w *indexWriter) finalize() error {
// for any index query. // for any index query.
// TODO(fabxc): also store offset to series section to allow plain // TODO(fabxc): also store offset to series section to allow plain
// iteration over all existing series? // iteration over all existing series?
// TODO(fabxc): store references like these that are not resolved via direct
// mmap using explicit endianness?
b := [8]byte{} b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], lo) binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po) binary.BigEndian.PutUint32(b[4:], po)
return w.write(w.w, b[:]) return w.write(w.bufw, b[:])
} }
func (w *indexWriter) Close() error { func (w *indexWriter) Close() error {
// Handle blocks without any data.
if !w.started {
if err := w.init(); err != nil {
return err
}
}
if err := w.finalize(); err != nil { if err := w.finalize(); err != nil {
return err return err
} }
return w.w.Flush() if err := w.bufw.Flush(); err != nil {
return err
}
if err := fileutil.Fsync(w.f); err != nil {
return err
}
return w.f.Close()
} }