mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
vendor: re-vendor storage
This commit is contained in:
parent
4397b4d508
commit
097a2c1e59
58
vendor/github.com/fabxc/tsdb/block.go
generated
vendored
58
vendor/github.com/fabxc/tsdb/block.go
generated
vendored
|
@ -7,6 +7,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -22,11 +23,7 @@ type Block interface {
|
||||||
Index() IndexReader
|
Index() IndexReader
|
||||||
|
|
||||||
// Series returns a SeriesReader over the block's data.
|
// Series returns a SeriesReader over the block's data.
|
||||||
Series() SeriesReader
|
Chunks() ChunkReader
|
||||||
|
|
||||||
// Persisted returns whether the block is already persisted,
|
|
||||||
// and no longer being appended to.
|
|
||||||
Persisted() bool
|
|
||||||
|
|
||||||
// Close releases all underlying resources of the block.
|
// Close releases all underlying resources of the block.
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -34,6 +31,9 @@ type Block interface {
|
||||||
|
|
||||||
// BlockMeta provides meta information about a block.
|
// BlockMeta provides meta information about a block.
|
||||||
type BlockMeta struct {
|
type BlockMeta struct {
|
||||||
|
// Unique identifier for the block and its contents. Changes on compaction.
|
||||||
|
ULID ulid.ULID `json:"ulid"`
|
||||||
|
|
||||||
// Sequence number of the block.
|
// Sequence number of the block.
|
||||||
Sequence int `json:"sequence"`
|
Sequence int `json:"sequence"`
|
||||||
|
|
||||||
|
@ -64,9 +64,7 @@ type persistedBlock struct {
|
||||||
dir string
|
dir string
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
|
|
||||||
chunksf, indexf *mmapFile
|
chunkr *chunkReader
|
||||||
|
|
||||||
chunkr *seriesReader
|
|
||||||
indexr *indexReader
|
indexr *indexReader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,58 +118,40 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
chunksf, err := openMmapFile(chunksFileName(dir))
|
cr, err := newChunkReader(chunkDir(dir))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "open chunk file")
|
return nil, err
|
||||||
}
|
}
|
||||||
indexf, err := openMmapFile(indexFileName(dir))
|
ir, err := newIndexReader(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "open index file")
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
sr, err := newSeriesReader([][]byte{chunksf.b})
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "create series reader")
|
|
||||||
}
|
|
||||||
ir, err := newIndexReader(indexf.b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "create index reader")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pb := &persistedBlock{
|
pb := &persistedBlock{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
chunksf: chunksf,
|
chunkr: cr,
|
||||||
indexf: indexf,
|
|
||||||
chunkr: sr,
|
|
||||||
indexr: ir,
|
indexr: ir,
|
||||||
}
|
}
|
||||||
return pb, nil
|
return pb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Close() error {
|
func (pb *persistedBlock) Close() error {
|
||||||
err0 := pb.chunksf.Close()
|
var merr MultiError
|
||||||
err1 := pb.indexf.Close()
|
|
||||||
|
|
||||||
if err0 != nil {
|
merr.Add(pb.chunkr.Close())
|
||||||
return err0
|
merr.Add(pb.indexr.Close())
|
||||||
}
|
|
||||||
return err1
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *persistedBlock) Dir() string { return pb.dir }
|
func (pb *persistedBlock) Dir() string { return pb.dir }
|
||||||
func (pb *persistedBlock) Persisted() bool { return true }
|
|
||||||
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
||||||
func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr }
|
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
||||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
||||||
|
|
||||||
func chunksFileName(path string) string {
|
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||||
return filepath.Join(path, "chunks-000")
|
func walDir(dir string) string { return filepath.Join(dir, "wal") }
|
||||||
}
|
|
||||||
|
|
||||||
func indexFileName(path string) string {
|
|
||||||
return filepath.Join(path, "index-000")
|
|
||||||
}
|
|
||||||
|
|
||||||
type mmapFile struct {
|
type mmapFile struct {
|
||||||
f *os.File
|
f *os.File
|
||||||
|
|
52
vendor/github.com/fabxc/tsdb/compact.go
generated
vendored
52
vendor/github.com/fabxc/tsdb/compact.go
generated
vendored
|
@ -1,12 +1,14 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
@ -60,11 +62,12 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionInfo struct {
|
type compactionInfo struct {
|
||||||
|
seq int
|
||||||
generation int
|
generation int
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
const compactionBlocksLen = 4
|
const compactionBlocksLen = 3
|
||||||
|
|
||||||
// pick returns a range [i, j) in the blocks that are suitable to be compacted
|
// pick returns a range [i, j) in the blocks that are suitable to be compacted
|
||||||
// into a single block at position i.
|
// into a single block at position i.
|
||||||
|
@ -103,8 +106,8 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
// Then we care about compacting multiple blocks, starting with the oldest.
|
||||||
for i := 0; i < len(bs)-compactionBlocksLen; i += compactionBlocksLen {
|
for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen {
|
||||||
if c.match(bs[i : i+2]) {
|
if c.match(bs[i : i+3]) {
|
||||||
return i, i + compactionBlocksLen, true
|
return i, i + compactionBlocksLen, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,28 +117,24 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
||||||
|
|
||||||
func (c *compactor) match(bs []compactionInfo) bool {
|
func (c *compactor) match(bs []compactionInfo) bool {
|
||||||
g := bs[0].generation
|
g := bs[0].generation
|
||||||
if g >= 5 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
if b.generation == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if b.generation != g {
|
if b.generation != g {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
|
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var entropy = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
||||||
m0 := blocks[0].Meta()
|
m0 := blocks[0].Meta()
|
||||||
|
|
||||||
res.Sequence = m0.Sequence
|
res.Sequence = m0.Sequence
|
||||||
res.MinTime = m0.MinTime
|
res.MinTime = m0.MinTime
|
||||||
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
||||||
|
res.ULID = ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
g := m0.Compaction.Generation
|
g := m0.Compaction.Generation
|
||||||
if g == 0 && len(blocks) > 1 {
|
if g == 0 && len(blocks) > 1 {
|
||||||
|
@ -166,18 +165,15 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
chunkw, err := newChunkWriter(chunkDir(dir))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "create chunk file")
|
return errors.Wrap(err, "open chunk writer")
|
||||||
}
|
}
|
||||||
indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
|
indexw, err := newIndexWriter(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "create index file")
|
return errors.Wrap(err, "open index writer")
|
||||||
}
|
}
|
||||||
|
|
||||||
indexw := newIndexWriter(indexf)
|
|
||||||
chunkw := newChunkWriter(chunkf)
|
|
||||||
|
|
||||||
if err = c.write(dir, blocks, indexw, chunkw); err != nil {
|
if err = c.write(dir, blocks, indexw, chunkw); err != nil {
|
||||||
return errors.Wrap(err, "write compaction")
|
return errors.Wrap(err, "write compaction")
|
||||||
}
|
}
|
||||||
|
@ -188,18 +184,6 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
|
||||||
if err = indexw.Close(); err != nil {
|
if err = indexw.Close(); err != nil {
|
||||||
return errors.Wrap(err, "close index writer")
|
return errors.Wrap(err, "close index writer")
|
||||||
}
|
}
|
||||||
if err = fileutil.Fsync(chunkf); err != nil {
|
|
||||||
return errors.Wrap(err, "fsync chunk file")
|
|
||||||
}
|
|
||||||
if err = fileutil.Fsync(indexf); err != nil {
|
|
||||||
return errors.Wrap(err, "fsync index file")
|
|
||||||
}
|
|
||||||
if err = chunkf.Close(); err != nil {
|
|
||||||
return errors.Wrap(err, "close chunk file")
|
|
||||||
}
|
|
||||||
if err = indexf.Close(); err != nil {
|
|
||||||
return errors.Wrap(err, "close index file")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +199,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
|
||||||
if hb, ok := b.(*headBlock); ok {
|
if hb, ok := b.(*headBlock); ok {
|
||||||
all = hb.remapPostings(all)
|
all = hb.remapPostings(all)
|
||||||
}
|
}
|
||||||
s := newCompactionSeriesSet(b.Index(), b.Series(), all)
|
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
set = s
|
set = s
|
||||||
|
@ -300,17 +284,17 @@ type compactionSet interface {
|
||||||
type compactionSeriesSet struct {
|
type compactionSeriesSet struct {
|
||||||
p Postings
|
p Postings
|
||||||
index IndexReader
|
index IndexReader
|
||||||
series SeriesReader
|
chunks ChunkReader
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []ChunkMeta
|
c []ChunkMeta
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet {
|
func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet {
|
||||||
return &compactionSeriesSet{
|
return &compactionSeriesSet{
|
||||||
index: i,
|
index: i,
|
||||||
series: s,
|
chunks: c,
|
||||||
p: p,
|
p: p,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -327,7 +311,7 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
for i := range c.c {
|
for i := range c.c {
|
||||||
chk := &c.c[i]
|
chk := &c.c[i]
|
||||||
|
|
||||||
chk.Chunk, c.err = c.series.Chunk(chk.Ref)
|
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
||||||
if c.err != nil {
|
if c.err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
39
vendor/github.com/fabxc/tsdb/db.go
generated
vendored
39
vendor/github.com/fabxc/tsdb/db.go
generated
vendored
|
@ -4,6 +4,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
@ -131,7 +132,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open returns a new DB in the given directory.
|
// Open returns a new DB in the given directory.
|
||||||
func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -148,8 +149,10 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
return nil, errors.Wrapf(err, "open DB in %s", dir)
|
return nil, errors.Wrapf(err, "open DB in %s", dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
var r prometheus.Registerer
|
if l == nil {
|
||||||
// r := prometheus.DefaultRegisterer
|
l = log.NewLogfmtLogger(os.Stdout)
|
||||||
|
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||||
|
}
|
||||||
|
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
|
@ -161,7 +164,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
db = &DB{
|
db = &DB{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
lockf: lockf,
|
lockf: lockf,
|
||||||
logger: logger,
|
logger: l,
|
||||||
metrics: newDBMetrics(r),
|
metrics: newDBMetrics(r),
|
||||||
opts: opts,
|
opts: opts,
|
||||||
compactc: make(chan struct{}, 1),
|
compactc: make(chan struct{}, 1),
|
||||||
|
@ -189,6 +192,7 @@ func (db *DB) run() {
|
||||||
case <-db.compactc:
|
case <-db.compactc:
|
||||||
db.metrics.compactionsTriggered.Inc()
|
db.metrics.compactionsTriggered.Inc()
|
||||||
|
|
||||||
|
var seqs []int
|
||||||
var infos []compactionInfo
|
var infos []compactionInfo
|
||||||
for _, b := range db.compactable() {
|
for _, b := range db.compactable() {
|
||||||
m := b.Meta()
|
m := b.Meta()
|
||||||
|
@ -197,17 +201,16 @@ func (db *DB) run() {
|
||||||
generation: m.Compaction.Generation,
|
generation: m.Compaction.Generation,
|
||||||
mint: m.MinTime,
|
mint: m.MinTime,
|
||||||
maxt: m.MaxTime,
|
maxt: m.MaxTime,
|
||||||
|
seq: m.Sequence,
|
||||||
})
|
})
|
||||||
|
seqs = append(seqs, m.Sequence)
|
||||||
}
|
}
|
||||||
|
|
||||||
i, j, ok := db.compactor.pick(infos)
|
i, j, ok := db.compactor.pick(infos)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
db.logger.Log("msg", "picked", "i", i, "j", j)
|
db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j]))
|
||||||
for k := i; k < j; k++ {
|
|
||||||
db.logger.Log("k", k, "generation", infos[k].generation)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.compact(i, j); err != nil {
|
if err := db.compact(i, j); err != nil {
|
||||||
db.logger.Log("msg", "compaction failed", "err", err)
|
db.logger.Log("msg", "compaction failed", "err", err)
|
||||||
|
@ -298,11 +301,16 @@ func (db *DB) compact(i, j int) error {
|
||||||
db.persisted = append(db.persisted, pb)
|
db.persisted = append(db.persisted, pb)
|
||||||
|
|
||||||
for _, b := range blocks[1:] {
|
for _, b := range blocks[1:] {
|
||||||
|
db.logger.Log("msg", "remove old dir", "dir", b.Dir())
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||||
return errors.Wrap(err, "removing old block")
|
return errors.Wrap(err, "removing old block")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.retentionCutoff()
|
if err := db.retentionCutoff(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) retentionCutoff() error {
|
func (db *DB) retentionCutoff() error {
|
||||||
|
@ -726,7 +734,7 @@ func isPowTwo(x int) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenPartitioned or create a new DB.
|
// OpenPartitioned or create a new DB.
|
||||||
func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*PartitionedDB, error) {
|
func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) {
|
||||||
if !isPowTwo(n) {
|
if !isPowTwo(n) {
|
||||||
return nil, errors.Errorf("%d is not a power of two", n)
|
return nil, errors.Errorf("%d is not a power of two", n)
|
||||||
}
|
}
|
||||||
|
@ -754,7 +762,7 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition
|
||||||
l := log.NewContext(l).With("partition", i)
|
l := log.NewContext(l).With("partition", i)
|
||||||
d := partitionDir(dir, i)
|
d := partitionDir(dir, i)
|
||||||
|
|
||||||
s, err := Open(d, l, opts)
|
s, err := Open(d, l, r, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
|
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
|
||||||
}
|
}
|
||||||
|
@ -880,3 +888,12 @@ func yoloString(b []byte) string {
|
||||||
}
|
}
|
||||||
return *((*string)(unsafe.Pointer(&h)))
|
return *((*string)(unsafe.Pointer(&h)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func closeAll(cs ...io.Closer) error {
|
||||||
|
var merr MultiError
|
||||||
|
|
||||||
|
for _, c := range cs {
|
||||||
|
merr.Add(c.Close())
|
||||||
|
}
|
||||||
|
return merr.Err()
|
||||||
|
}
|
||||||
|
|
27
vendor/github.com/fabxc/tsdb/head.go
generated
vendored
27
vendor/github.com/fabxc/tsdb/head.go
generated
vendored
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/fabxc/tsdb/chunks"
|
"github.com/fabxc/tsdb/chunks"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,11 +63,16 @@ type headBlock struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) {
|
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) {
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ulid, err := ulid.New(ulid.Now(), entropy)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := writeMetaFile(dir, &BlockMeta{
|
if err := writeMetaFile(dir, &BlockMeta{
|
||||||
|
ULID: ulid,
|
||||||
Sequence: seq,
|
Sequence: seq,
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
|
@ -133,10 +139,19 @@ func (h *headBlock) inBounds(t int64) bool {
|
||||||
|
|
||||||
// Close syncs all data and closes underlying resources of the head block.
|
// Close syncs all data and closes underlying resources of the head block.
|
||||||
func (h *headBlock) Close() error {
|
func (h *headBlock) Close() error {
|
||||||
if err := writeMetaFile(h.dir, &h.meta); err != nil {
|
if err := h.wal.Close(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return h.wal.Close()
|
// Check whether the head block still exists in the underlying dir
|
||||||
|
// or has already been replaced with a compacted version
|
||||||
|
meta, err := readMetaFile(h.dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if meta.ULID == h.meta.ULID {
|
||||||
|
return writeMetaFile(h.dir, &h.meta)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headBlock) Meta() BlockMeta {
|
func (h *headBlock) Meta() BlockMeta {
|
||||||
|
@ -149,7 +164,7 @@ func (h *headBlock) Meta() BlockMeta {
|
||||||
func (h *headBlock) Dir() string { return h.dir }
|
func (h *headBlock) Dir() string { return h.dir }
|
||||||
func (h *headBlock) Persisted() bool { return false }
|
func (h *headBlock) Persisted() bool { return false }
|
||||||
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
|
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
|
||||||
func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} }
|
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
|
||||||
func (h *headBlock) Appender() Appender {
|
func (h *headBlock) Appender() Appender {
|
||||||
atomic.AddUint64(&h.activeWriters, 1)
|
atomic.AddUint64(&h.activeWriters, 1)
|
||||||
|
@ -359,12 +374,12 @@ func (a *headAppender) Rollback() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type headSeriesReader struct {
|
type headChunkReader struct {
|
||||||
*headBlock
|
*headBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunk returns the chunk for the reference number.
|
// Chunk returns the chunk for the reference number.
|
||||||
func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
|
|
12
vendor/github.com/fabxc/tsdb/querier.go
generated
vendored
12
vendor/github.com/fabxc/tsdb/querier.go
generated
vendored
|
@ -59,7 +59,7 @@ func (s *DB) Querier(mint, maxt int64) Querier {
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
index: b.Index(),
|
index: b.Index(),
|
||||||
series: b.Series(),
|
chunks: b.Chunks(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(fabxc): find nicer solution.
|
// TODO(fabxc): find nicer solution.
|
||||||
|
@ -123,19 +123,19 @@ func (q *querier) Close() error {
|
||||||
// blockQuerier provides querying access to a single block database.
|
// blockQuerier provides querying access to a single block database.
|
||||||
type blockQuerier struct {
|
type blockQuerier struct {
|
||||||
index IndexReader
|
index IndexReader
|
||||||
series SeriesReader
|
chunks ChunkReader
|
||||||
|
|
||||||
postingsMapper func(Postings) Postings
|
postingsMapper func(Postings) Postings
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier {
|
func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier {
|
||||||
return &blockQuerier{
|
return &blockQuerier{
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
index: ix,
|
index: ix,
|
||||||
series: s,
|
chunks: c,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +162,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
||||||
|
|
||||||
return &blockSeriesSet{
|
return &blockSeriesSet{
|
||||||
index: q.index,
|
index: q.index,
|
||||||
chunks: q.series,
|
chunks: q.chunks,
|
||||||
it: p,
|
it: p,
|
||||||
absent: absent,
|
absent: absent,
|
||||||
mint: q.mint,
|
mint: q.mint,
|
||||||
|
@ -425,7 +425,7 @@ func (s *partitionSeriesSet) Next() bool {
|
||||||
// blockSeriesSet is a set of series from an inverted index query.
|
// blockSeriesSet is a set of series from an inverted index query.
|
||||||
type blockSeriesSet struct {
|
type blockSeriesSet struct {
|
||||||
index IndexReader
|
index IndexReader
|
||||||
chunks SeriesReader
|
chunks ChunkReader
|
||||||
it Postings // postings list referencing series
|
it Postings // postings list referencing series
|
||||||
absent []string // labels that must not be set for result series
|
absent []string // labels that must not be set for result series
|
||||||
mint, maxt int64 // considered time range
|
mint, maxt int64 // considered time range
|
||||||
|
|
84
vendor/github.com/fabxc/tsdb/reader.go
generated
vendored
84
vendor/github.com/fabxc/tsdb/reader.go
generated
vendored
|
@ -3,6 +3,8 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/fabxc/tsdb/chunks"
|
"github.com/fabxc/tsdb/chunks"
|
||||||
|
@ -10,23 +12,43 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SeriesReader provides reading access of serialized time series data.
|
// ChunkReader provides reading access of serialized time series data.
|
||||||
type SeriesReader interface {
|
type ChunkReader interface {
|
||||||
// Chunk returns the series data chunk with the given reference.
|
// Chunk returns the series data chunk with the given reference.
|
||||||
Chunk(ref uint64) (chunks.Chunk, error)
|
Chunk(ref uint64) (chunks.Chunk, error)
|
||||||
|
|
||||||
|
// Close releases all underlying resources of the reader.
|
||||||
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesReader implements a SeriesReader for a serialized byte stream
|
// chunkReader implements a SeriesReader for a serialized byte stream
|
||||||
// of series data.
|
// of series data.
|
||||||
type seriesReader struct {
|
type chunkReader struct {
|
||||||
// The underlying bytes holding the encoded series data.
|
// The underlying bytes holding the encoded series data.
|
||||||
bs [][]byte
|
bs [][]byte
|
||||||
|
|
||||||
|
// Closers for resources behind the byte slices.
|
||||||
|
cs []io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSeriesReader(bs [][]byte) (*seriesReader, error) {
|
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
||||||
s := &seriesReader{bs: bs}
|
func newChunkReader(dir string) (*chunkReader, error) {
|
||||||
|
files, err := sequenceFiles(dir, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var cr chunkReader
|
||||||
|
|
||||||
for i, b := range bs {
|
for _, fn := range files {
|
||||||
|
f, err := openMmapFile(fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "mmap files")
|
||||||
|
}
|
||||||
|
cr.cs = append(cr.cs, f)
|
||||||
|
cr.bs = append(cr.bs, f.b)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, b := range cr.bs {
|
||||||
if len(b) < 4 {
|
if len(b) < 4 {
|
||||||
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
||||||
}
|
}
|
||||||
|
@ -35,10 +57,14 @@ func newSeriesReader(bs [][]byte) (*seriesReader, error) {
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s, nil
|
return &cr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
func (s *chunkReader) Close() error {
|
||||||
|
return closeAll(s.cs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
var (
|
var (
|
||||||
seq = int(ref >> 32)
|
seq = int(ref >> 32)
|
||||||
off = int((ref << 32) >> 32)
|
off = int((ref << 32) >> 32)
|
||||||
|
@ -80,6 +106,9 @@ type IndexReader interface {
|
||||||
|
|
||||||
// LabelIndices returns the label pairs for which indices exist.
|
// LabelIndices returns the label pairs for which indices exist.
|
||||||
LabelIndices() ([][]string, error)
|
LabelIndices() ([][]string, error)
|
||||||
|
|
||||||
|
// Close released the underlying resources of the reader.
|
||||||
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// StringTuples provides access to a sorted list of string tuples.
|
// StringTuples provides access to a sorted list of string tuples.
|
||||||
|
@ -94,6 +123,9 @@ type indexReader struct {
|
||||||
// The underlying byte slice holding the encoded series data.
|
// The underlying byte slice holding the encoded series data.
|
||||||
b []byte
|
b []byte
|
||||||
|
|
||||||
|
// Close that releases the underlying resources of the byte slice.
|
||||||
|
c io.Closer
|
||||||
|
|
||||||
// Cached hashmaps of section offsets.
|
// Cached hashmaps of section offsets.
|
||||||
labels map[string]uint32
|
labels map[string]uint32
|
||||||
postings map[string]uint32
|
postings map[string]uint32
|
||||||
|
@ -104,34 +136,38 @@ var (
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
)
|
)
|
||||||
|
|
||||||
func newIndexReader(b []byte) (*indexReader, error) {
|
// newIndexReader returns a new indexReader on the given directory.
|
||||||
if len(b) < 4 {
|
func newIndexReader(dir string) (*indexReader, error) {
|
||||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
f, err := openMmapFile(filepath.Join(dir, "index"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
r := &indexReader{b: b}
|
r := &indexReader{b: f.b, c: f}
|
||||||
|
|
||||||
// Verify magic number.
|
// Verify magic number.
|
||||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex {
|
if len(f.b) < 4 {
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||||
|
}
|
||||||
|
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
|
||||||
|
return nil, errors.Errorf("invalid magic number %x", m)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
// The last two 4 bytes hold the pointers to the hashmaps.
|
// The last two 4 bytes hold the pointers to the hashmaps.
|
||||||
loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4])
|
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
|
||||||
poff := binary.BigEndian.Uint32(b[len(b)-4:])
|
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
|
||||||
|
|
||||||
f, b, err := r.section(loff)
|
flag, b, err := r.section(loff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
||||||
}
|
}
|
||||||
if r.labels, err = readHashmap(f, b); err != nil {
|
if r.labels, err = readHashmap(flag, b); err != nil {
|
||||||
return nil, errors.Wrap(err, "read label index hashmap")
|
return nil, errors.Wrap(err, "read label index hashmap")
|
||||||
}
|
}
|
||||||
f, b, err = r.section(poff)
|
flag, b, err = r.section(poff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
||||||
}
|
}
|
||||||
if r.postings, err = readHashmap(f, b); err != nil {
|
if r.postings, err = readHashmap(flag, b); err != nil {
|
||||||
return nil, errors.Wrap(err, "read postings hashmap")
|
return nil, errors.Wrap(err, "read postings hashmap")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,6 +205,10 @@ func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Close() error {
|
||||||
|
return r.c.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
b := r.b[o:]
|
b := r.b[o:]
|
||||||
|
|
||||||
|
|
236
vendor/github.com/fabxc/tsdb/writer.go
generated
vendored
236
vendor/github.com/fabxc/tsdb/writer.go
generated
vendored
|
@ -6,10 +6,13 @@ import (
|
||||||
"hash"
|
"hash"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/bradfitz/slice"
|
"github.com/bradfitz/slice"
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/chunks"
|
"github.com/fabxc/tsdb/chunks"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -33,9 +36,6 @@ type ChunkWriter interface {
|
||||||
// is set and can be used to retrieve the chunks from the written data.
|
// is set and can be used to retrieve the chunks from the written data.
|
||||||
WriteChunks(chunks ...ChunkMeta) error
|
WriteChunks(chunks ...ChunkMeta) error
|
||||||
|
|
||||||
// Size returns the size of the data written so far.
|
|
||||||
Size() int64
|
|
||||||
|
|
||||||
// Close writes any required finalization and closes the resources
|
// Close writes any required finalization and closes the resources
|
||||||
// associated with the underlying writer.
|
// associated with the underlying writer.
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -44,20 +44,109 @@ type ChunkWriter interface {
|
||||||
// chunkWriter implements the ChunkWriter interface for the standard
|
// chunkWriter implements the ChunkWriter interface for the standard
|
||||||
// serialization format.
|
// serialization format.
|
||||||
type chunkWriter struct {
|
type chunkWriter struct {
|
||||||
ow io.Writer
|
dirFile *os.File
|
||||||
w *bufio.Writer
|
files []*os.File
|
||||||
|
wbuf *bufio.Writer
|
||||||
n int64
|
n int64
|
||||||
c int
|
|
||||||
crc32 hash.Hash
|
crc32 hash.Hash
|
||||||
|
|
||||||
|
segmentSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChunkWriter(w io.Writer) *chunkWriter {
|
const (
|
||||||
return &chunkWriter{
|
defaultChunkSegmentSize = 512 * 1024 * 1024
|
||||||
ow: w,
|
|
||||||
w: bufio.NewWriterSize(w, 1*1024*1024),
|
chunksFormatV1 = 1
|
||||||
|
indexFormatV1 = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
func newChunkWriter(dir string) (*chunkWriter, error) {
|
||||||
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dirFile, err := fileutil.OpenDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cw := &chunkWriter{
|
||||||
|
dirFile: dirFile,
|
||||||
n: 0,
|
n: 0,
|
||||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||||
|
segmentSize: defaultChunkSegmentSize,
|
||||||
}
|
}
|
||||||
|
return cw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) tail() *os.File {
|
||||||
|
if len(w.files) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return w.files[len(w.files)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// finalizeTail writes all pending data to the current tail file,
|
||||||
|
// truncates its size, and closes it.
|
||||||
|
func (w *chunkWriter) finalizeTail() error {
|
||||||
|
tf := w.tail()
|
||||||
|
if tf == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.wbuf.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(tf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// As the file was pre-allocated, we truncate any superfluous zero bytes.
|
||||||
|
off, err := tf.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := tf.Truncate(off); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tf.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) cut() error {
|
||||||
|
// Sync current tail to disk and close.
|
||||||
|
w.finalizeTail()
|
||||||
|
|
||||||
|
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = w.dirFile.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write header metadata for new file.
|
||||||
|
|
||||||
|
metab := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint32(metab[:4], MagicSeries)
|
||||||
|
metab[4] = chunksFormatV1
|
||||||
|
|
||||||
|
if _, err := f.Write(metab); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.files = append(w.files, f)
|
||||||
|
if w.wbuf != nil {
|
||||||
|
w.wbuf.Reset(f)
|
||||||
|
} else {
|
||||||
|
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
|
||||||
|
}
|
||||||
|
w.n = 8
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
||||||
|
@ -66,44 +155,40 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *chunkWriter) writeMeta() error {
|
|
||||||
b := [8]byte{}
|
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(b[:4], MagicSeries)
|
|
||||||
b[4] = flagStd
|
|
||||||
|
|
||||||
return w.write(w.w, b[:])
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
||||||
// Initialize with meta data.
|
// Calculate maximum space we need and cut a new segment in case
|
||||||
if w.n == 0 {
|
// we don't fit into the current one.
|
||||||
if err := w.writeMeta(); err != nil {
|
maxLen := int64(binary.MaxVarintLen32)
|
||||||
|
for _, c := range chks {
|
||||||
|
maxLen += binary.MaxVarintLen32 + 1
|
||||||
|
maxLen += int64(len(c.Chunk.Bytes()))
|
||||||
|
}
|
||||||
|
newsz := w.n + maxLen
|
||||||
|
|
||||||
|
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
|
||||||
|
if err := w.cut(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write chunks sequentially and set the reference field in the ChunkMeta.
|
||||||
w.crc32.Reset()
|
w.crc32.Reset()
|
||||||
wr := io.MultiWriter(w.crc32, w.w)
|
wr := io.MultiWriter(w.crc32, w.wbuf)
|
||||||
|
|
||||||
// For normal reads we don't need the number of the chunk section but
|
b := make([]byte, binary.MaxVarintLen32)
|
||||||
// it allows us to verify checksums without reading the index file.
|
n := binary.PutUvarint(b, uint64(len(chks)))
|
||||||
// The offsets are also technically enough to calculate chunk size. but
|
|
||||||
// holding the length of each chunk could later allow for adding padding
|
|
||||||
// between chunks.
|
|
||||||
b := [binary.MaxVarintLen32]byte{}
|
|
||||||
n := binary.PutUvarint(b[:], uint64(len(chks)))
|
|
||||||
|
|
||||||
if err := w.write(wr, b[:n]); err != nil {
|
if err := w.write(wr, b[:n]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
seq := uint64(w.seq()) << 32
|
||||||
|
|
||||||
for i := range chks {
|
for i := range chks {
|
||||||
chk := &chks[i]
|
chk := &chks[i]
|
||||||
|
|
||||||
chk.Ref = uint64(w.n)
|
chk.Ref = seq | uint64(w.n)
|
||||||
|
|
||||||
n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
|
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
||||||
|
|
||||||
if err := w.write(wr, b[:n]); err != nil {
|
if err := w.write(wr, b[:n]); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -117,24 +202,18 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
||||||
chk.Chunk = nil
|
chk.Chunk = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.write(w.w, w.crc32.Sum(nil)); err != nil {
|
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *chunkWriter) Size() int64 {
|
func (w *chunkWriter) seq() int {
|
||||||
return w.n
|
return len(w.files) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *chunkWriter) Close() error {
|
func (w *chunkWriter) Close() error {
|
||||||
// Initialize block in case no data was written to it.
|
return w.finalizeTail()
|
||||||
if w.n == 0 {
|
|
||||||
if err := w.writeMeta(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return w.w.Flush()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChunkMeta holds information about a chunk of data.
|
// ChunkMeta holds information about a chunk of data.
|
||||||
|
@ -155,7 +234,7 @@ type IndexWriter interface {
|
||||||
// of chunks that the index can reference.
|
// of chunks that the index can reference.
|
||||||
// The reference number is used to resolve a series against the postings
|
// The reference number is used to resolve a series against the postings
|
||||||
// list iterator. It only has to be available during the write processing.
|
// list iterator. It only has to be available during the write processing.
|
||||||
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta)
|
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error
|
||||||
|
|
||||||
// WriteLabelIndex serializes an index from label names to values.
|
// WriteLabelIndex serializes an index from label names to values.
|
||||||
// The passed in values chained tuples of strings of the length of names.
|
// The passed in values chained tuples of strings of the length of names.
|
||||||
|
@ -164,9 +243,6 @@ type IndexWriter interface {
|
||||||
// WritePostings writes a postings list for a single label pair.
|
// WritePostings writes a postings list for a single label pair.
|
||||||
WritePostings(name, value string, it Postings) error
|
WritePostings(name, value string, it Postings) error
|
||||||
|
|
||||||
// Size returns the size of the data written so far.
|
|
||||||
Size() int64
|
|
||||||
|
|
||||||
// Close writes any finalization and closes theresources associated with
|
// Close writes any finalization and closes theresources associated with
|
||||||
// the underlying writer.
|
// the underlying writer.
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -181,13 +257,12 @@ type indexWriterSeries struct {
|
||||||
// indexWriter implements the IndexWriter interface for the standard
|
// indexWriter implements the IndexWriter interface for the standard
|
||||||
// serialization format.
|
// serialization format.
|
||||||
type indexWriter struct {
|
type indexWriter struct {
|
||||||
ow io.Writer
|
f *os.File
|
||||||
w *bufio.Writer
|
bufw *bufio.Writer
|
||||||
n int64
|
n int64
|
||||||
started bool
|
started bool
|
||||||
|
|
||||||
series map[uint32]*indexWriterSeries
|
series map[uint32]*indexWriterSeries
|
||||||
|
|
||||||
symbols map[string]uint32 // symbol offsets
|
symbols map[string]uint32 // symbol offsets
|
||||||
labelIndexes []hashEntry // label index offsets
|
labelIndexes []hashEntry // label index offsets
|
||||||
postings []hashEntry // postings lists offsets
|
postings []hashEntry // postings lists offsets
|
||||||
|
@ -195,15 +270,31 @@ type indexWriter struct {
|
||||||
crc32 hash.Hash
|
crc32 hash.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIndexWriter(w io.Writer) *indexWriter {
|
func newIndexWriter(dir string) (*indexWriter, error) {
|
||||||
return &indexWriter{
|
df, err := fileutil.OpenDir(dir)
|
||||||
w: bufio.NewWriterSize(w, 1*1024*1024),
|
if err != nil {
|
||||||
ow: w,
|
return nil, err
|
||||||
|
}
|
||||||
|
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(df); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "sync dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
iw := &indexWriter{
|
||||||
|
f: f,
|
||||||
|
bufw: bufio.NewWriterSize(f, 1*1024*1024),
|
||||||
n: 0,
|
n: 0,
|
||||||
symbols: make(map[string]uint32, 4096),
|
symbols: make(map[string]uint32, 4096),
|
||||||
series: make(map[uint32]*indexWriterSeries, 4096),
|
series: make(map[uint32]*indexWriterSeries, 4096),
|
||||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||||
}
|
}
|
||||||
|
if err := iw.writeMeta(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
||||||
|
@ -215,7 +306,7 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
||||||
// section writes a CRC32 checksummed section of length l and guarded by flag.
|
// section writes a CRC32 checksummed section of length l and guarded by flag.
|
||||||
func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error {
|
func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error {
|
||||||
w.crc32.Reset()
|
w.crc32.Reset()
|
||||||
wr := io.MultiWriter(w.crc32, w.w)
|
wr := io.MultiWriter(w.crc32, w.bufw)
|
||||||
|
|
||||||
b := [5]byte{flag, 0, 0, 0, 0}
|
b := [5]byte{flag, 0, 0, 0, 0}
|
||||||
binary.BigEndian.PutUint32(b[1:], l)
|
binary.BigEndian.PutUint32(b[1:], l)
|
||||||
|
@ -225,9 +316,9 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f(wr); err != nil {
|
if err := f(wr); err != nil {
|
||||||
return errors.Wrap(err, "contents write func")
|
return errors.Wrap(err, "write contents")
|
||||||
}
|
}
|
||||||
if err := w.write(w.w, w.crc32.Sum(nil)); err != nil {
|
if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
|
||||||
return errors.Wrap(err, "writing checksum")
|
return errors.Wrap(err, "writing checksum")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -239,10 +330,13 @@ func (w *indexWriter) writeMeta() error {
|
||||||
binary.BigEndian.PutUint32(b[:4], MagicIndex)
|
binary.BigEndian.PutUint32(b[:4], MagicIndex)
|
||||||
b[4] = flagStd
|
b[4] = flagStd
|
||||||
|
|
||||||
return w.write(w.w, b[:])
|
return w.write(w.bufw, b[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) {
|
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error {
|
||||||
|
if _, ok := w.series[ref]; ok {
|
||||||
|
return errors.Errorf("series with reference %d already added", ref)
|
||||||
|
}
|
||||||
// Populate the symbol table from all label sets we have to reference.
|
// Populate the symbol table from all label sets we have to reference.
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
w.symbols[l.Name] = 0
|
w.symbols[l.Name] = 0
|
||||||
|
@ -253,6 +347,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM
|
||||||
labels: lset,
|
labels: lset,
|
||||||
chunks: chunks,
|
chunks: chunks,
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) writeSymbols() error {
|
func (w *indexWriter) writeSymbols() error {
|
||||||
|
@ -340,9 +435,6 @@ func (w *indexWriter) writeSeries() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) init() error {
|
func (w *indexWriter) init() error {
|
||||||
if err := w.writeMeta(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := w.writeSymbols(); err != nil {
|
if err := w.writeSymbols(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -439,10 +531,6 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Size() int64 {
|
|
||||||
return w.n
|
|
||||||
}
|
|
||||||
|
|
||||||
type hashEntry struct {
|
type hashEntry struct {
|
||||||
name string
|
name string
|
||||||
offset uint32
|
offset uint32
|
||||||
|
@ -482,24 +570,22 @@ func (w *indexWriter) finalize() error {
|
||||||
// for any index query.
|
// for any index query.
|
||||||
// TODO(fabxc): also store offset to series section to allow plain
|
// TODO(fabxc): also store offset to series section to allow plain
|
||||||
// iteration over all existing series?
|
// iteration over all existing series?
|
||||||
// TODO(fabxc): store references like these that are not resolved via direct
|
|
||||||
// mmap using explicit endianness?
|
|
||||||
b := [8]byte{}
|
b := [8]byte{}
|
||||||
binary.BigEndian.PutUint32(b[:4], lo)
|
binary.BigEndian.PutUint32(b[:4], lo)
|
||||||
binary.BigEndian.PutUint32(b[4:], po)
|
binary.BigEndian.PutUint32(b[4:], po)
|
||||||
|
|
||||||
return w.write(w.w, b[:])
|
return w.write(w.bufw, b[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Close() error {
|
func (w *indexWriter) Close() error {
|
||||||
// Handle blocks without any data.
|
|
||||||
if !w.started {
|
|
||||||
if err := w.init(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := w.finalize(); err != nil {
|
if err := w.finalize(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return w.w.Flush()
|
if err := w.bufw.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(w.f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return w.f.Close()
|
||||||
}
|
}
|
||||||
|
|
6
vendor/vendor.json
vendored
6
vendor/vendor.json
vendored
|
@ -368,10 +368,10 @@
|
||||||
"revisionTime": "2016-09-30T00:14:02Z"
|
"revisionTime": "2016-09-30T00:14:02Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "edB8coiX4s6hf6BZuYE5+MPJYX8=",
|
"checksumSHA1": "UeErAjCWDt1vzaXGQGxnenFUg7w=",
|
||||||
"path": "github.com/fabxc/tsdb",
|
"path": "github.com/fabxc/tsdb",
|
||||||
"revision": "f734773214e1bcb7962d92155863110d01214db5",
|
"revision": "db5c88ea9ac9400b0e58ef18b9b272c34b98639b",
|
||||||
"revisionTime": "2017-02-19T12:01:19Z"
|
"revisionTime": "2017-02-28T07:40:51Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",
|
"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",
|
||||||
|
|
Loading…
Reference in a new issue