From 83574b1565c5952457ce8f9731c4c685f704a009 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 4 Dec 2016 13:16:11 +0100 Subject: [PATCH] Add new interfaces and skeleton --- db.go | 196 +++++++++++++++++++++++++++++++++++++++----- db_test.go | 62 ++++++++++++++ index.go | 135 ++++++++++++++++++++++++++++++ shard.go | 126 ++++++++++++++++++++++++++++ test/conv_test.go | 26 ++++++ test/labels_test.go | 50 +++++++++++ 6 files changed, 576 insertions(+), 19 deletions(-) create mode 100644 db_test.go create mode 100644 index.go create mode 100644 test/conv_test.go create mode 100644 test/labels_test.go diff --git a/db.go b/db.go index 074cd80216..c9d06b8a03 100644 --- a/db.go +++ b/db.go @@ -2,11 +2,12 @@ package tsdb import ( - "encoding/binary" - "sync" + "fmt" + "os" + "sort" "time" - "github.com/fabxc/tsdb/chunks" + "github.com/cespare/xxhash" "github.com/prometheus/common/log" ) @@ -25,39 +26,166 @@ type DB struct { logger log.Logger opts *Options - shards map[uint64]*TimeShards + shards []*SeriesShard } +// TODO(fabxc): make configurable +const ( + numSeriesShards = 32 + maxChunkSize = 1024 +) + // Open or create a new DB. func Open(path string, l log.Logger, opts *Options) (*DB, error) { if opts == nil { opts = DefaultOptions } + if err := os.MkdirAll(path, 0777); err != nil { + return nil, err + } c := &DB{ - logger: l, - opts: opts, + logger: l, + opts: opts, } + // Initialize vertical shards. + // TODO(fabxc): validate shard number to be power of 2, which is required + // for the bitshift-modulo when finding the right shard. + for i := 0; i < numSeriesShards; i++ { + c.shards = append(c.shards, NewSeriesShard()) + } + + // TODO(fabxc): run background compaction + GC. + return c, nil } +// Close the database. +func (db *DB) Close() error { + return fmt.Errorf("not implemented") +} + +// Querier returns a new querier over the database. +func (db *DB) Querier(start, end int64) Querier { + return nil +} + +// Matcher matches a string. +type Matcher interface { + // Match returns true if the matcher applies to the string value. + Match(v string) bool +} + +// Querier provides querying access over time series data of a fixed +// time range. +type Querier interface { + // Iterator returns an interator over the inverted index that + // matches the key label by the constraints of Matcher. + Iterator(key string, m Matcher) Iterator + + // Labels resolves a label reference into a set of labels. + Labels(ref LabelRefs) (Labels, error) + + // Series returns series provided in the index iterator. + Series(Iterator) []Series + + // Close releases the resources of the Querier. + Close() error + + // Range returns the timestamp range of the Querier. + Range() (start, end int64) +} + +// Series represents a single time series. +type Series interface { + // LabelsRef returns the label set reference + LabelRefs() LabelRefs + // Iterator returns a new iterator of the data of the series. + Iterator() SeriesIterator +} + +// SeriesIterator iterates over the data of a time series. +type SeriesIterator interface { + // Seek advances the iterator forward to the given timestamp. + // If there's no value exactly at ts, it advances to the last value + // before ts. + Seek(ts int64) bool + // Values returns the current timestamp/value pair. + Values() (int64, float64) + // Next advances the iterator by one. + Next() bool + // Err returns the current error. + Err() error +} + +type LabelRefs struct { + block uint64 + offsets []uint32 +} + +// Label is a key/value pair of strings. type Label struct { Name, Value string } -// LabelSet is a sorted set of labels. Order has to be guaranteed upon +// Labels is a sorted set of labels. Order has to be guaranteed upon // instantiation. -type LabelSet []Label +type Labels []Label -func (ls LabelSet) Len() int { return len(ls) } -func (ls LabelSet) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i]} -func (ls LabelSet) Less(i, j int) bool { return ls[i].Name < ls[j].Name } +func (ls Labels) Len() int { return len(ls) } +func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] } +func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name } -// NewLabelSet returns a sorted LabelSet from the given labels. +// Hash returns a hash value for the label set. +func (ls Labels) Hash() uint64 { + b := make([]byte, 0, 512) + for _, v := range ls { + b = append(b, v.Name...) + b = append(b, '\xff') + b = append(b, v.Value...) + b = append(b, '\xff') + } + return xxhash.Sum64(b) +} + +// Get returns the value for the label with the given name. +// Returns an empty string if the label doesn't exist. +func (ls Labels) Get(name string) string { + for _, l := range ls { + if l.Name == name { + return l.Value + } + } + return "" +} + +// Equals returns whether the two label sets are equal. +func (ls Labels) Equals(o Labels) bool { + if len(ls) != len(o) { + return false + } + for i, l := range ls { + if l.Name != o[i].Name || l.Value != o[i].Value { + return false + } + } + return true +} + +// Map returns a string map of the labels. +func (ls Labels) Map() map[string]string { + m := make(map[string]string, len(ls)) + for _, l := range ls { + m[l.Name] = l.Value + } + return m +} + +// NewLabels returns a sorted Labels from the given labels. // The caller has to guarantee that all label names are unique. -func NewLabelSet(ls ...Label) LabelSet { - set := make(LabelSet, 0, len(l)) +func NewLabels(ls ...Label) Labels { + set := make(Labels, 0, len(ls)) for _, l := range ls { set = append(set, l) } @@ -66,11 +194,41 @@ func NewLabelSet(ls ...Label) LabelSet { return set } -type Vector struct { - LabelSets []LabelSet - Values []float64 +// LabelsFromMap returns new sorted Labels from the given map. +func LabelsFromMap(m map[string]string) Labels { + l := make([]Label, 0, len(m)) + for k, v := range m { + l = append(l, Label{Name: k, Value: v}) + } + return NewLabels(l...) } -func (db *DB) AppendVector(v *Vector) error { +// Vector is a set of LabelSet associated with one value each. +// Label sets and values must have equal length. +type Vector struct { + LabelSets []Labels + Values []float64 +} + +// AppendVector adds values for a list of label sets for the given timestamp +// in milliseconds. +func (db *DB) AppendVector(ts int64, v *Vector) error { + // Sequentially add samples to shards. + for i, ls := range v.LabelSets { + h := ls.Hash() + shard := db.shards[h>>(64-uint(len(db.shards)))] + + // TODO(fabxc): benchmark whether grouping into shards and submitting to + // shards in batches is more efficient. + shard.head.mtx.Lock() + + if err := shard.head.append(h, ls, ts, v.Values[i]); err != nil { + shard.head.mtx.Unlock() + // TODO(fabxc): handle gracefully and collect multi-error. + return err + } + shard.head.mtx.Unlock() + } + return nil -} \ No newline at end of file +} diff --git a/db_test.go b/db_test.go new file mode 100644 index 0000000000..99a3cc1acd --- /dev/null +++ b/db_test.go @@ -0,0 +1,62 @@ +package tsdb + +import "testing" + +func BenchmarkLabelSetFromMap(b *testing.B) { + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + var ls Labels + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + ls = LabelsFromMap(m) + } + _ = ls +} + +func BenchmarkMapFromLabels(b *testing.B) { + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + ls := LabelsFromMap(m) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + m = ls.Map() + } +} + +func BenchmarkLabelSetEquals(b *testing.B) { + // The vast majority of comparisons will be against a matching label set. + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + ls := LabelsFromMap(m) + var res bool + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res = ls.Equals(ls) + } + _ = res +} diff --git a/index.go b/index.go new file mode 100644 index 0000000000..f6a1adc8f4 --- /dev/null +++ b/index.go @@ -0,0 +1,135 @@ +package tsdb + +import "sort" + +// Index provides read access to an inverted index. +type Index interface { + Postings(ref uint32) Iterator +} + +// memIndex is an inverted in-memory index. +type memIndex struct { + lastID uint32 + m map[string][]uint32 +} + +// Postings returns an iterator over the postings list for s. +func (ix *memIndex) Postings(s string) Iterator { + return &listIterator{list: ix.m[s]} +} + +// add adds a document to the index. The caller has to ensure that no +// term argument appears twice. +func (ix *memIndex) add(terms ...string) uint32 { + ix.lastID++ + + for _, t := range terms { + ix.m[t] = append(ix.m[t], ix.lastID) + } + + return ix.lastID +} + +// newMemIndex returns a new in-memory index. +func newMemIndex() *memIndex { + return &memIndex{m: make(map[string][]uint32)} +} + +// Iterator provides iterative access over a postings list. +type Iterator interface { + // Next advances the iterator and returns true if another + // value was found. + Next() bool + // Seek advances the iterator to value v or greater and returns + // true if a value was found. + Seek(v uint32) bool + // Value returns the value at the current iterator position. + Value() uint32 +} + +// compressIndex returns a compressed index for the given input index. +func compressIndex(ix Index) { + +} + +// Intersect returns a new iterator over the intersection of the +// input iterators. +func Intersect(its ...Iterator) Iterator { + if len(its) == 0 { + return nil + } + a := its[0] + + for _, b := range its[1:] { + a = &intersectIterator{a: a, b: b} + } + return a +} + +type intersectIterator struct { + a, b Iterator +} + +func (it *intersectIterator) Value() uint32 { + return 0 +} + +func (it *intersectIterator) Next() bool { + return false +} + +func (it *intersectIterator) Seek(id uint32) bool { + return false +} + +// Merge returns a new iterator over the union of the input iterators. +func Merge(its ...Iterator) Iterator { + if len(its) == 0 { + return nil + } + a := its[0] + + for _, b := range its[1:] { + a = &mergeIterator{a: a, b: b} + } + return a +} + +type mergeIterator struct { + a, b Iterator +} + +func (it *mergeIterator) Value() uint32 { + return 0 +} + +func (it *mergeIterator) Next() bool { + return false +} + +func (it *mergeIterator) Seek(id uint32) bool { + return false +} + +// listIterator implements the Iterator interface over a plain list. +type listIterator struct { + list []uint32 + idx int +} + +func (it *listIterator) Value() uint32 { + return it.list[it.idx] +} + +func (it *listIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listIterator) Seek(x uint32) bool { + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + return it.list[i+it.idx] >= x + }) + return it.idx < len(it.list) +} diff --git a/shard.go b/shard.go index e69de29bb2..9ad62fd236 100644 --- a/shard.go +++ b/shard.go @@ -0,0 +1,126 @@ +package tsdb + +import ( + "fmt" + "io" + "math" + "sync" + + "github.com/fabxc/tsdb/chunks" +) + +const sep = '\xff' + +// SeriesShard handles reads and writes of time series falling into +// a hashed shard of a series. +type SeriesShard struct { + mtx sync.RWMutex + blocks *Block + head *HeadBlock +} + +// NewSeriesShard returns a new SeriesShard. +func NewSeriesShard() *SeriesShard { + return &SeriesShard{ + // TODO(fabxc): restore from checkpoint. + head: &HeadBlock{ + index: newMemIndex(), + descs: map[uint64][]*chunkDesc{}, + values: map[string][]string{}, + forward: map[uint32]*chunkDesc{}, + }, + // TODO(fabxc): provide access to persisted blocks. + } +} + +// HeadBlock handles reads and writes of time series data within a time window. +type HeadBlock struct { + mtx sync.RWMutex + descs map[uint64][]*chunkDesc // labels hash to possible chunks descs + forward map[uint32]*chunkDesc // chunk ID to chunk desc + values map[string][]string // label names to possible values + index *memIndex // inverted index for label pairs +} + +// Block handles reads against a completed block of time series data within a time window. +type Block struct { +} + +// WriteTo serializes the current head block contents into w. +func (h *HeadBlock) WriteTo(w io.Writer) (int64, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + + return 0, fmt.Errorf("not implemented") +} + +// get retrieves the chunk with the hash and label set and creates +// a new one if it doesn't exist yet. +func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) { + cds := h.descs[hash] + for _, cd := range cds { + if cd.lset.Equals(lset) { + return cd, false + } + } + // None of the given chunks was for the series, create a new one. + cd := &chunkDesc{ + lset: lset, + chunk: chunks.NewXORChunk(int(math.MaxInt64)), + } + + h.descs[hash] = append(cds, cd) + return cd, true +} + +// append adds the sample to the headblock. If the series is seen +// for the first time it creates a chunk and index entries for it. +// +// TODO(fabxc): switch to single writer and append queue with optimistic concurrency? +func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error { + chkd, created := h.get(hash, lset) + if created { + // Add each label pair as a term to the inverted index. + terms := make([]string, 0, len(lset)) + b := make([]byte, 0, 64) + + for _, l := range lset { + b = append(b, l.Name...) + b = append(b, sep) + b = append(b, l.Value...) + + terms = append(terms, string(b)) + b = b[:0] + } + id := h.index.add(terms...) + + // Store forward index for the returned ID. + h.forward[id] = chkd + } + return chkd.append(ts, v) +} + +// chunkDesc wraps a plain data chunk and provides cached meta data about it. +type chunkDesc struct { + lset Labels + chunk chunks.Chunk + + // Caching fields. + lastTimestamp int64 + lastValue float64 + + app chunks.Appender // Current appender for the chunks. +} + +func (cd *chunkDesc) append(ts int64, v float64) (err error) { + if cd.app == nil { + cd.app, err = cd.chunk.Appender() + if err != nil { + return err + } + } + cd.lastTimestamp = ts + cd.lastValue = v + + return cd.app.Append(ts, v) +} diff --git a/test/conv_test.go b/test/conv_test.go new file mode 100644 index 0000000000..3816db2e99 --- /dev/null +++ b/test/conv_test.go @@ -0,0 +1,26 @@ +package test + +import "testing" + +func BenchmarkMapConversion(b *testing.B) { + type key string + type val string + + m := map[key]val{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + + var sm map[string]string + + for i := 0; i < b.N; i++ { + sm = make(map[string]string, len(m)) + for k, v := range m { + sm[string(k)] = string(v) + } + } +} diff --git a/test/labels_test.go b/test/labels_test.go new file mode 100644 index 0000000000..507dfce233 --- /dev/null +++ b/test/labels_test.go @@ -0,0 +1,50 @@ +package test + +import ( + "testing" + + "github.com/fabxc/tsdb" +) + +func BenchmarkLabelMapAccess(b *testing.B) { + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + + var v string + + for k := range m { + b.Run(k, func(b *testing.B) { + for i := 0; i < b.N; i++ { + v = m[k] + } + }) + } +} + +func BenchmarkLabelSetAccess(b *testing.B) { + m := map[string]string{ + "job": "node", + "instance": "123.123.1.211:9090", + "path": "/api/v1/namespaces//deployments/", + "method": "GET", + "namespace": "system", + "status": "500", + } + ls := tsdb.LabelsFromMap(m) + + var v string + + for _, l := range ls { + b.Run(l.Name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + v = ls.Get(l.Name) + } + }) + } +}