Remove Partitioned* code

This commit is contained in:
Fabian Reinartz 2017-03-06 17:34:49 +01:00
parent eedbebe1d7
commit 87805fb83f
4 changed files with 2 additions and 213 deletions

View file

@ -122,13 +122,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
}
})
defer func() {
reportSize(dir)
if b.cleanup {
os.RemoveAll(b.outPath)
}
}()
var total uint64
dur := measureTime("ingestScrapes", func() {
@ -282,22 +275,6 @@ func (b *writeBenchmark) stopProfiling() {
}
}
func reportSize(dir string) {
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil || path == dir {
return err
}
if info.Size() < 10*1024*1024 {
return nil
}
fmt.Printf(" > file=%s size=%.04fGiB\n", path[len(dir):], float64(info.Size())/1024/1024/1024)
return nil
})
if err != nil {
exitWithError(err)
}
}
func measureTime(stage string, f func()) time.Duration {
fmt.Printf(">> start stage=%s\n", stage)
start := time.Now()

127
db.go
View file

@ -6,10 +6,8 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
@ -706,123 +704,6 @@ func nextSequenceFile(dir, prefix string) (string, int, error) {
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
}
// PartitionedDB is a time series storage.
type PartitionedDB struct {
logger log.Logger
dir string
partitionPow uint
Partitions []*DB
}
func isPowTwo(x int) bool {
return x > 0 && (x&(x-1)) == 0
}
// OpenPartitioned or create a new DB.
func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) {
if !isPowTwo(n) {
return nil, errors.Errorf("%d is not a power of two", n)
}
if opts == nil {
opts = DefaultOptions
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
c := &PartitionedDB{
logger: l,
dir: dir,
partitionPow: uint(math.Log2(float64(n))),
}
// Initialize vertical partitiondb.
// TODO(fabxc): validate partition number to be power of 2, which is required
// for the bitshift-modulo when finding the right partition.
for i := 0; i < n; i++ {
l := log.NewContext(l).With("partition", i)
d := partitionDir(dir, i)
s, err := Open(d, l, r, opts)
if err != nil {
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
}
c.Partitions = append(c.Partitions, s)
}
return c, nil
}
func partitionDir(base string, i int) string {
return filepath.Join(base, fmt.Sprintf("p-%0.4d", i))
}
// Close the database.
func (db *PartitionedDB) Close() error {
var g errgroup.Group
for _, partition := range db.Partitions {
g.Go(partition.Close)
}
return g.Wait()
}
// Appender returns a new appender against the database.
func (db *PartitionedDB) Appender() Appender {
app := &partitionedAppender{db: db}
for _, p := range db.Partitions {
app.partitions = append(app.partitions, p.Appender().(*dbAppender))
}
return app
}
type partitionedAppender struct {
db *PartitionedDB
partitions []*dbAppender
}
func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h := lset.Hash()
p := h >> (64 - a.db.partitionPow)
ref, err := a.partitions[p].Add(lset, t, v)
if err != nil {
return 0, err
}
return ref | (p << 48), nil
}
func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error {
p := uint8((ref << 8) >> 56)
return a.partitions[p].AddFast(ref, t, v)
}
func (a *partitionedAppender) Commit() error {
var merr MultiError
for _, p := range a.partitions {
merr.Add(p.Commit())
}
return merr.Err()
}
func (a *partitionedAppender) Rollback() error {
var merr MultiError
for _, p := range a.partitions {
merr.Add(p.Rollback())
}
return merr.Err()
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
@ -866,13 +747,7 @@ func (es MultiError) Err() error {
}
func yoloString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h := reflect.StringHeader{
Data: sh.Data,
Len: sh.Len,
}
return *((*string)(unsafe.Pointer(&h)))
return *((*string)(unsafe.Pointer(&b)))
}
func closeAll(cs ...io.Closer) error {

View file

@ -584,7 +584,7 @@ type memSeries struct {
lastValue float64
sampleBuf [4]sample
app chunks.Appender // Current appender for the chunkdb.
app chunks.Appender // Current appender for the chunk.
}
func (s *memSeries) cut() *memChunk {

View file

@ -233,69 +233,6 @@ func (q *blockQuerier) Close() error {
return nil
}
// partitionedQuerier merges query results from a set of partition querieres.
type partitionedQuerier struct {
mint, maxt int64
partitions []Querier
}
// Querier returns a new querier over the database for the given
// time range.
func (db *PartitionedDB) Querier(mint, maxt int64) Querier {
q := &partitionedQuerier{
mint: mint,
maxt: maxt,
}
for _, s := range db.Partitions {
q.partitions = append(q.partitions, s.Querier(mint, maxt))
}
return q
}
func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every partition and simply
// return their union.
r := &mergedSeriesSet{}
for _, s := range q.partitions {
r.sets = append(r.sets, s.Select(ms...))
}
if len(r.sets) == 0 {
return nopSeriesSet{}
}
return r
}
func (q *partitionedQuerier) LabelValues(n string) ([]string, error) {
res, err := q.partitions[0].LabelValues(n)
if err != nil {
return nil, err
}
for _, sq := range q.partitions[1:] {
pr, err := sq.LabelValues(n)
if err != nil {
return nil, err
}
// Merge new values into deduplicated result.
res = mergeStrings(res, pr)
}
return res, nil
}
func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *partitionedQuerier) Close() error {
var merr MultiError
for _, sq := range q.partitions {
merr.Add(sq.Close())
}
return merr.Err()
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {