From 87805fb83f54bc37dc875a64012da45ac725538b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 6 Mar 2017 17:34:49 +0100 Subject: [PATCH] Remove Partitioned* code --- cmd/tsdb/main.go | 23 --------- db.go | 127 +---------------------------------------------- head.go | 2 +- querier.go | 63 ----------------------- 4 files changed, 2 insertions(+), 213 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 437f4ce8f8..5755824319 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -122,13 +122,6 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { } }) - defer func() { - reportSize(dir) - if b.cleanup { - os.RemoveAll(b.outPath) - } - }() - var total uint64 dur := measureTime("ingestScrapes", func() { @@ -282,22 +275,6 @@ func (b *writeBenchmark) stopProfiling() { } } -func reportSize(dir string) { - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil || path == dir { - return err - } - if info.Size() < 10*1024*1024 { - return nil - } - fmt.Printf(" > file=%s size=%.04fGiB\n", path[len(dir):], float64(info.Size())/1024/1024/1024) - return nil - }) - if err != nil { - exitWithError(err) - } -} - func measureTime(stage string, f func()) time.Duration { fmt.Printf(">> start stage=%s\n", stage) start := time.Now() diff --git a/db.go b/db.go index 4b619fd4e9..0811783609 100644 --- a/db.go +++ b/db.go @@ -6,10 +6,8 @@ import ( "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" - "reflect" "strconv" "strings" "sync" @@ -706,123 +704,6 @@ func nextSequenceFile(dir, prefix string) (string, int, error) { return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil } -// PartitionedDB is a time series storage. -type PartitionedDB struct { - logger log.Logger - dir string - - partitionPow uint - Partitions []*DB -} - -func isPowTwo(x int) bool { - return x > 0 && (x&(x-1)) == 0 -} - -// OpenPartitioned or create a new DB. -func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) { - if !isPowTwo(n) { - return nil, errors.Errorf("%d is not a power of two", n) - } - if opts == nil { - opts = DefaultOptions - } - if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) - } - - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - c := &PartitionedDB{ - logger: l, - dir: dir, - partitionPow: uint(math.Log2(float64(n))), - } - - // Initialize vertical partitiondb. - // TODO(fabxc): validate partition number to be power of 2, which is required - // for the bitshift-modulo when finding the right partition. - for i := 0; i < n; i++ { - l := log.NewContext(l).With("partition", i) - d := partitionDir(dir, i) - - s, err := Open(d, l, r, opts) - if err != nil { - return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) - } - - c.Partitions = append(c.Partitions, s) - } - - return c, nil -} - -func partitionDir(base string, i int) string { - return filepath.Join(base, fmt.Sprintf("p-%0.4d", i)) -} - -// Close the database. -func (db *PartitionedDB) Close() error { - var g errgroup.Group - - for _, partition := range db.Partitions { - g.Go(partition.Close) - } - - return g.Wait() -} - -// Appender returns a new appender against the database. -func (db *PartitionedDB) Appender() Appender { - app := &partitionedAppender{db: db} - - for _, p := range db.Partitions { - app.partitions = append(app.partitions, p.Appender().(*dbAppender)) - } - return app -} - -type partitionedAppender struct { - db *PartitionedDB - partitions []*dbAppender -} - -func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - h := lset.Hash() - p := h >> (64 - a.db.partitionPow) - - ref, err := a.partitions[p].Add(lset, t, v) - if err != nil { - return 0, err - } - return ref | (p << 48), nil -} - -func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error { - p := uint8((ref << 8) >> 56) - return a.partitions[p].AddFast(ref, t, v) -} - -func (a *partitionedAppender) Commit() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Commit()) - } - return merr.Err() -} - -func (a *partitionedAppender) Rollback() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Rollback()) - } - return merr.Err() -} - // The MultiError type implements the error interface, and contains the // Errors used to construct it. type MultiError []error @@ -866,13 +747,7 @@ func (es MultiError) Err() error { } func yoloString(b []byte) string { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - - h := reflect.StringHeader{ - Data: sh.Data, - Len: sh.Len, - } - return *((*string)(unsafe.Pointer(&h))) + return *((*string)(unsafe.Pointer(&b))) } func closeAll(cs ...io.Closer) error { diff --git a/head.go b/head.go index 6d70d56846..2a95993864 100644 --- a/head.go +++ b/head.go @@ -584,7 +584,7 @@ type memSeries struct { lastValue float64 sampleBuf [4]sample - app chunks.Appender // Current appender for the chunkdb. + app chunks.Appender // Current appender for the chunk. } func (s *memSeries) cut() *memChunk { diff --git a/querier.go b/querier.go index 7783ef3120..9b614e1845 100644 --- a/querier.go +++ b/querier.go @@ -233,69 +233,6 @@ func (q *blockQuerier) Close() error { return nil } -// partitionedQuerier merges query results from a set of partition querieres. -type partitionedQuerier struct { - mint, maxt int64 - partitions []Querier -} - -// Querier returns a new querier over the database for the given -// time range. -func (db *PartitionedDB) Querier(mint, maxt int64) Querier { - q := &partitionedQuerier{ - mint: mint, - maxt: maxt, - } - for _, s := range db.Partitions { - q.partitions = append(q.partitions, s.Querier(mint, maxt)) - } - - return q -} - -func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet { - // We gather the non-overlapping series from every partition and simply - // return their union. - r := &mergedSeriesSet{} - - for _, s := range q.partitions { - r.sets = append(r.sets, s.Select(ms...)) - } - if len(r.sets) == 0 { - return nopSeriesSet{} - } - return r -} - -func (q *partitionedQuerier) LabelValues(n string) ([]string, error) { - res, err := q.partitions[0].LabelValues(n) - if err != nil { - return nil, err - } - for _, sq := range q.partitions[1:] { - pr, err := sq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) - } - return res, nil -} - -func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *partitionedQuerier) Close() error { - var merr MultiError - - for _, sq := range q.partitions { - merr.Add(sq.Close()) - } - return merr.Err() -} - func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) {