Rename shard to partition

This commit is contained in:
Fabian Reinartz 2017-01-06 08:08:02 +01:00
parent 9790aa98ac
commit 4590b61343
4 changed files with 103 additions and 103 deletions

View file

@ -31,31 +31,31 @@ type compactorMetrics struct {
}
func newCompactorMetrics(i int) *compactorMetrics {
shardLabel := prometheus.Labels{
"shard": fmt.Sprintf("%d", i),
partitionLabel := prometheus.Labels{
"partition": fmt.Sprintf("%d", i),
}
m := &compactorMetrics{}
m.triggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_triggered_total",
Help: "Total number of triggered compactions for the shard.",
ConstLabels: shardLabel,
Name: "tsdb_partition_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
ConstLabels: partitionLabel,
})
m.ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_total",
Help: "Total number of compactions that were executed for the shard.",
ConstLabels: shardLabel,
Name: "tsdb_partition_compactions_total",
Help: "Total number of compactions that were executed for the partition.",
ConstLabels: partitionLabel,
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_failed_total",
Help: "Total number of compactions that failed for the shard.",
ConstLabels: shardLabel,
Name: "tsdb_partition_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
ConstLabels: partitionLabel,
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_shard_compaction_duration",
Name: "tsdb_partition_compaction_duration",
Help: "Duration of compaction runs.",
ConstLabels: shardLabel,
ConstLabels: partitionLabel,
})
return m

110
db.go
View file

@ -41,14 +41,14 @@ type DB struct {
opts *Options
path string
shards []*Shard
partitions []*Partition
}
// TODO(fabxc): make configurable
const (
shardShift = 0
numShards = 1 << shardShift
maxChunkSize = 1024
partitionShift = 0
numPartitions = 1 << partitionShift
maxChunkSize = 1024
)
// Open or create a new DB.
@ -70,25 +70,25 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
path: path,
}
// Initialize vertical shards.
// TODO(fabxc): validate shard number to be power of 2, which is required
// for the bitshift-modulo when finding the right shard.
for i := 0; i < numShards; i++ {
l := log.NewContext(l).With("shard", i)
d := shardDir(path, i)
// Initialize vertical partitions.
// TODO(fabxc): validate partition number to be power of 2, which is required
// for the bitshift-modulo when finding the right partition.
for i := 0; i < numPartitions; i++ {
l := log.NewContext(l).With("partition", i)
d := partitionDir(path, i)
s, err := OpenShard(d, i, l)
s, err := OpenPartition(d, i, l)
if err != nil {
return nil, fmt.Errorf("initializing shard %q failed: %s", d, err)
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
}
c.shards = append(c.shards, s)
c.partitions = append(c.partitions, s)
}
return c, nil
}
func shardDir(base string, i int) string {
func partitionDir(base string, i int) string {
return filepath.Join(base, strconv.Itoa(i))
}
@ -96,8 +96,8 @@ func shardDir(base string, i int) string {
func (db *DB) Close() error {
var g errgroup.Group
for _, shard := range db.shards {
g.Go(shard.Close)
for _, partition := range db.partitions {
g.Go(partition.Close)
}
return g.Wait()
@ -122,7 +122,7 @@ type Appender interface {
func (db *DB) Appender() Appender {
return &bucketAppender{
db: db,
buckets: make([][]hashedSample, numShards),
buckets: make([][]hashedSample, numPartitions),
}
}
@ -133,7 +133,7 @@ type bucketAppender struct {
func (ba *bucketAppender) Add(lset labels.Labels, t int64, v float64) error {
h := lset.Hash()
s := h >> (64 - shardShift)
s := h >> (64 - partitionShift)
ba.buckets[s] = append(ba.buckets[s], hashedSample{
hash: h,
@ -156,9 +156,9 @@ func (ba *bucketAppender) Commit() error {
var merr MultiError
// Spill buckets into shards.
// Spill buckets into partitions.
for s, b := range ba.buckets {
merr.Add(ba.db.shards[s].appendBatch(b))
merr.Add(ba.db.partitions[s].appendBatch(b))
}
return merr.Err()
}
@ -174,12 +174,12 @@ type hashedSample struct {
const sep = '\xff'
// Shard handles reads and writes of time series falling into
// a hashed shard of a series.
type Shard struct {
// Partition handles reads and writes of time series falling into
// a hashed partition of a series.
type Partition struct {
path string
logger log.Logger
metrics *shardMetrics
metrics *partitionMetrics
mtx sync.RWMutex
persisted []*persistedBlock
@ -190,34 +190,34 @@ type Shard struct {
cutc chan struct{}
}
type shardMetrics struct {
type partitionMetrics struct {
persistences prometheus.Counter
persistenceDuration prometheus.Histogram
samplesAppended prometheus.Counter
}
func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics {
shardLabel := prometheus.Labels{
"shard": fmt.Sprintf("%d", i),
func newPartitionMetrics(r prometheus.Registerer, i int) *partitionMetrics {
partitionLabel := prometheus.Labels{
"partition": fmt.Sprintf("%d", i),
}
m := &shardMetrics{}
m := &partitionMetrics{}
m.persistences = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_persistences_total",
Name: "tsdb_partition_persistences_total",
Help: "Total number of head persistances that ran so far.",
ConstLabels: shardLabel,
ConstLabels: partitionLabel,
})
m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_shard_persistence_duration_seconds",
Name: "tsdb_partition_persistence_duration_seconds",
Help: "Duration of persistences in seconds.",
ConstLabels: shardLabel,
ConstLabels: partitionLabel,
Buckets: prometheus.ExponentialBuckets(0.25, 2, 5),
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_samples_appended_total",
Help: "Total number of appended samples for the shard.",
ConstLabels: shardLabel,
Name: "tsdb_partition_samples_appended_total",
Help: "Total number of appended samples for the partition.",
ConstLabels: partitionLabel,
})
if r != nil {
@ -230,9 +230,9 @@ func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics {
return m
}
// OpenShard returns a new Shard.
func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
// Create directory if shard is new.
// OpenPartition returns a new Partition.
func OpenPartition(path string, i int, logger log.Logger) (*Partition, error) {
// Create directory if partition is new.
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
@ -258,10 +258,10 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
heads = []*HeadBlock{head}
}
s := &Shard{
s := &Partition{
path: path,
logger: logger,
metrics: newShardMetrics(nil, i),
metrics: newPartitionMetrics(nil, i),
heads: heads,
persisted: persisted,
cutc: make(chan struct{}, 1),
@ -275,7 +275,7 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
return s, nil
}
func (s *Shard) run() {
func (s *Partition) run() {
for range s.cutc {
// if err := s.cut(); err != nil {
// s.logger.Log("msg", "cut error", "err", err)
@ -296,8 +296,8 @@ func (s *Shard) run() {
close(s.donec)
}
// Close the shard.
func (s *Shard) Close() error {
// Close the partition.
func (s *Partition) Close() error {
close(s.cutc)
<-s.donec
@ -317,7 +317,7 @@ func (s *Shard) Close() error {
return merr.Err()
}
func (s *Shard) appendBatch(samples []hashedSample) error {
func (s *Partition) appendBatch(samples []hashedSample) error {
if len(samples) == 0 {
return nil
}
@ -344,11 +344,11 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
return err
}
func (s *Shard) lock() sync.Locker {
func (s *Partition) lock() sync.Locker {
return &s.mtx
}
func (s *Shard) headForDir(dir string) (int, bool) {
func (s *Partition) headForDir(dir string) (int, bool) {
for i, b := range s.heads {
if b.dir() == dir {
return i, true
@ -357,7 +357,7 @@ func (s *Shard) headForDir(dir string) (int, bool) {
return -1, false
}
func (s *Shard) persistedForDir(dir string) (int, bool) {
func (s *Partition) persistedForDir(dir string) (int, bool) {
for i, b := range s.persisted {
if b.dir() == dir {
return i, true
@ -366,7 +366,7 @@ func (s *Shard) persistedForDir(dir string) (int, bool) {
return -1, false
}
func (s *Shard) reinit(dir string) error {
func (s *Partition) reinit(dir string) error {
if !fileutil.Exist(dir) {
if i, ok := s.headForDir(dir); ok {
if err := s.heads[i].Close(); err != nil {
@ -410,7 +410,7 @@ func (s *Shard) reinit(dir string) error {
return nil
}
func (s *Shard) compactable() []block {
func (s *Partition) compactable() []block {
var blocks []block
for _, pb := range s.persisted {
blocks = append([]block{pb}, blocks...)
@ -444,9 +444,9 @@ func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
// blocksForRange returns all blocks within the shard that may contain
// blocksForRange returns all blocks within the partition that may contain
// data for the given time range.
func (s *Shard) blocksForInterval(mint, maxt int64) []block {
func (s *Partition) blocksForInterval(mint, maxt int64) []block {
var bs []block
for _, b := range s.persisted {
@ -472,7 +472,7 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (s *Shard) cut() error {
func (s *Partition) cut() error {
// Set new head block.
head := s.heads[len(s.heads)-1]
@ -487,7 +487,7 @@ func (s *Shard) cut() error {
return nil
}
// func (s *Shard) persist() error {
// func (s *Partition) persist() error {
// s.mtx.Lock()
// // Set new head block.
@ -501,7 +501,7 @@ func (s *Shard) cut() error {
// s.mtx.Unlock()
// // TODO(fabxc): add grace period where we can still append to old head shard
// // TODO(fabxc): add grace period where we can still append to old head partition
// // before actually persisting it.
// dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))

View file

@ -35,10 +35,10 @@ type Series interface {
Iterator() SeriesIterator
}
// querier merges query results from a set of shard querieres.
// querier merges query results from a set of partition querieres.
type querier struct {
mint, maxt int64
shards []Querier
partitions []Querier
}
// Querier returns a new querier over the database for the given
@ -48,19 +48,19 @@ func (db *DB) Querier(mint, maxt int64) Querier {
mint: mint,
maxt: maxt,
}
for _, s := range db.shards {
q.shards = append(q.shards, s.Querier(mint, maxt))
for _, s := range db.partitions {
q.partitions = append(q.partitions, s.Querier(mint, maxt))
}
return q
}
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every shard and simply
// We gather the non-overlapping series from every partition and simply
// return their union.
r := &mergedSeriesSet{}
for _, s := range q.shards {
for _, s := range q.partitions {
r.sets = append(r.sets, s.Select(ms...))
}
if len(r.sets) == 0 {
@ -70,11 +70,11 @@ func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
}
func (q *querier) LabelValues(n string) ([]string, error) {
res, err := q.shards[0].LabelValues(n)
res, err := q.partitions[0].LabelValues(n)
if err != nil {
return nil, err
}
for _, sq := range q.shards[1:] {
for _, sq := range q.partitions[1:] {
pr, err := sq.LabelValues(n)
if err != nil {
return nil, err
@ -120,29 +120,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
func (q *querier) Close() error {
var merr MultiError
for _, sq := range q.shards {
for _, sq := range q.partitions {
merr.Add(sq.Close())
}
return merr.Err()
}
// shardQuerier aggregates querying results from time blocks within
// a single shard.
type shardQuerier struct {
shard *Shard
blocks []Querier
// partitionQuerier aggregates querying results from time blocks within
// a single partition.
type partitionQuerier struct {
partition *Partition
blocks []Querier
}
// Querier returns a new querier over the data shard for the given
// Querier returns a new querier over the data partition for the given
// time range.
func (s *Shard) Querier(mint, maxt int64) Querier {
func (s *Partition) Querier(mint, maxt int64) Querier {
s.mtx.RLock()
blocks := s.blocksForInterval(mint, maxt)
sq := &shardQuerier{
blocks: make([]Querier, 0, len(blocks)),
shard: s,
sq := &partitionQuerier{
blocks: make([]Querier, 0, len(blocks)),
partition: s,
}
for _, b := range blocks {
@ -163,7 +163,7 @@ func (s *Shard) Querier(mint, maxt int64) Querier {
return sq
}
func (q *shardQuerier) LabelValues(n string) ([]string, error) {
func (q *partitionQuerier) LabelValues(n string) ([]string, error) {
res, err := q.blocks[0].LabelValues(n)
if err != nil {
return nil, err
@ -179,11 +179,11 @@ func (q *shardQuerier) LabelValues(n string) ([]string, error) {
return res, nil
}
func (q *shardQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
func (q *partitionQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet {
func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
@ -194,18 +194,18 @@ func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet {
r := q.blocks[0].Select(ms...)
for _, s := range q.blocks[1:] {
r = newShardSeriesSet(r, s.Select(ms...))
r = newPartitionSeriesSet(r, s.Select(ms...))
}
return r
}
func (q *shardQuerier) Close() error {
func (q *partitionQuerier) Close() error {
var merr MultiError
for _, bq := range q.blocks {
merr.Add(bq.Close())
}
q.shard.mtx.RUnlock()
q.partition.mtx.RUnlock()
return merr.Err()
}
@ -359,15 +359,15 @@ func (s *mergedSeriesSet) Next() bool {
return s.Next()
}
type shardSeriesSet struct {
type partitionSeriesSet struct {
a, b SeriesSet
cur Series
adone, bdone bool
}
func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet {
s := &shardSeriesSet{a: a, b: b}
func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet {
s := &partitionSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
s.adone = !s.a.Next()
@ -376,18 +376,18 @@ func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet {
return s
}
func (s *shardSeriesSet) At() Series {
func (s *partitionSeriesSet) At() Series {
return s.cur
}
func (s *shardSeriesSet) Err() error {
func (s *partitionSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}
func (s *shardSeriesSet) compare() int {
func (s *partitionSeriesSet) compare() int {
if s.adone {
return 1
}
@ -397,7 +397,7 @@ func (s *shardSeriesSet) compare() int {
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}
func (s *shardSeriesSet) Next() bool {
func (s *partitionSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}

View file

@ -65,7 +65,7 @@ func (it *listSeriesIterator) Err() error {
return nil
}
func TestShardSeriesSet(t *testing.T) {
func TestPartitionSeriesSet(t *testing.T) {
newSeries := func(l map[string]string, s []sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
@ -77,7 +77,7 @@ func TestShardSeriesSet(t *testing.T) {
// The input sets in order (samples in series in b are strictly
// after those in a).
a, b SeriesSet
// The composition of a and b in the shard series set must yield
// The composition of a and b in the partition series set must yield
// results equivalent to the result series set.
exp SeriesSet
}{
@ -170,7 +170,7 @@ func TestShardSeriesSet(t *testing.T) {
Outer:
for _, c := range cases {
res := newShardSeriesSet(c.a, c.b)
res := newPartitionSeriesSet(c.a, c.b)
for {
eok, rok := c.exp.Next(), res.Next()