Extract labels package

This commit is contained in:
Fabian Reinartz 2016-12-21 09:39:01 +01:00
parent ee217adc7e
commit ede733ab6c
9 changed files with 213 additions and 145 deletions

View file

@ -14,6 +14,7 @@ import (
"time"
"github.com/fabxc/tsdb"
"github.com/fabxc/tsdb/labels"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
@ -162,16 +163,16 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount
ts := int64(model.Now())
type sample struct {
labels tsdb.Labels
labels labels.Labels
value int64
}
scrape := make(map[uint64]*sample, len(metrics))
for _, m := range metrics {
lset := make(tsdb.Labels, 0, len(m))
lset := make(labels.Labels, 0, len(m))
for k, v := range m {
lset = append(lset, tsdb.Label{Name: string(k), Value: string(v)})
lset = append(lset, labels.Label{Name: string(k), Value: string(v)})
}
sort.Sort(lset)

91
db.go
View file

@ -7,7 +7,6 @@ import (
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
"sync"
"time"
@ -15,8 +14,8 @@ import (
"golang.org/x/sync/errgroup"
"github.com/cespare/xxhash"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
)
@ -112,7 +111,7 @@ type Appender interface {
// AddSeries(Labels) uint64
// Add adds a sample pair for the referenced series.
Add(lset Labels, t int64, v float64)
Add(lset labels.Labels, t int64, v float64)
// Commit submits the collected samples and purges the batch.
Commit() error
@ -131,7 +130,7 @@ type bucketAppender struct {
buckets [][]hashedSample
}
func (ba *bucketAppender) Add(lset Labels, t int64, v float64) {
func (ba *bucketAppender) Add(lset labels.Labels, t int64, v float64) {
h := lset.Hash()
s := h >> (64 - shardShift)
@ -163,7 +162,7 @@ func (ba *bucketAppender) Commit() error {
type hashedSample struct {
hash uint64
labels Labels
labels labels.Labels
t int64
v float64
@ -334,7 +333,7 @@ func (s *Shard) persist() error {
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
lset Labels
lset labels.Labels
chunk chunks.Chunk
// Caching fields.
@ -363,86 +362,6 @@ func (cd *chunkDesc) append(ts int64, v float64) (err error) {
return nil
}
// Label is a key/value pair of strings.
type Label struct {
Name, Value string
}
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 1024)
for _, v := range ls {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Equals returns whether the two label sets are equal.
func (ls Labels) Equals(o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
// NewLabels returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
func NewLabels(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
for _, l := range ls {
set = append(set, l)
}
sort.Sort(set)
return set
}
// LabelsFromMap returns new sorted Labels from the given map.
func LabelsFromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return NewLabels(l...)
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error

View file

@ -7,6 +7,7 @@ import (
"sync"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
)
// HeadBlock handles reads and writes of time series data within a time window.
@ -99,7 +100,7 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) {
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc {
func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
cds := h.descs[hash]
for _, cd := range cds {
if cd.lset.Equals(lset) {
@ -122,7 +123,7 @@ func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc {
}
// append adds the sample to the headblock.
func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error {
func (h *HeadBlock) append(hash uint64, lset labels.Labels, ts int64, v float64) error {
if err := h.get(hash, lset).append(ts, v); err != nil {
return err
}

89
labels/labels.go Normal file
View file

@ -0,0 +1,89 @@
package labels
import (
"sort"
"github.com/cespare/xxhash"
)
const sep = '\xff'
// Label is a key/value pair of strings.
type Label struct {
Name, Value string
}
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 1024)
for _, v := range ls {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Equals returns whether the two label sets are equal.
func (ls Labels) Equals(o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
// NewLabels returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
func NewLabels(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
for _, l := range ls {
set = append(set, l)
}
sort.Sort(set)
return set
}
// LabelsFromMap returns new sorted Labels from the given map.
func LabelsFromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return NewLabels(l...)
}

68
labels/selector.go Normal file
View file

@ -0,0 +1,68 @@
package labels
import "regexp"
// Selector holds constraints for matching against a label set.
type Selector []Matcher
// Matches returns whether the labels satisfy all matchers.
func (s Selector) Matches(labels Labels) bool {
for _, m := range s {
if v := labels.Get(m.Name()); !m.Matches(v) {
return false
}
}
return true
}
// Matcher specifies a constraint for the value of a label.
type Matcher interface {
// Name returns the label name the matcher should apply to.
Name() string
// Matches checks whether a value fulfills the constraints.
Matches(v string) bool
}
type equalMatcher struct {
name, value string
}
func (m *equalMatcher) Name() string { return m.name }
func (m *equalMatcher) Matches(v string) bool { return v == m.value }
// NewEqualMatcher returns a new matcher matching an exact label value.
func NewEqualMatcher(name, value string) Matcher {
return &equalMatcher{name: name, value: value}
}
type regexpMatcher struct {
name string
re *regexp.Regexp
}
func (m *regexpMatcher) Name() string { return m.name }
func (m *regexpMatcher) Matches(v string) bool { return m.re.MatchString(v) }
// NewRegexpMatcher returns a new matcher verifying that a value matches
// the regular expression pattern.
func NewRegexpMatcher(name, pattern string) (Matcher, error) {
re, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}
return &regexpMatcher{name: name, re: re}, nil
}
// notMatcher inverts the matching result for a matcher.
type notMatcher struct {
Matcher
}
func (m *notMatcher) Matches(v string) bool {
return !m.Matcher.Matches(v)
}
// Not inverts the matcher's matching result.
func Not(m Matcher) Matcher {
return &notMatcher{m}
}

View file

@ -7,38 +7,20 @@ import (
"strings"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
)
// Matcher matches a string.
type Matcher interface {
Name() string
// Match returns true if the matcher applies to the string value.
Match(v string) bool
}
type equalMatcher struct {
name string
value string
}
func MatchEquals(n, v string) Matcher {
return &equalMatcher{name: n, value: v}
}
func (m *equalMatcher) Name() string { return m.name }
func (m *equalMatcher) Match(v string) bool { return v == m.value }
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(...Matcher) SeriesSet
Select(...labels.Matcher) SeriesSet
// LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error)
// LabelValuesFor returns all potential values for a label name.
// under the constraint of another label.
LabelValuesFor(string, Label) ([]string, error)
LabelValuesFor(string, labels.Label) ([]string, error)
// Close releases the resources of the Querier.
Close() error
@ -47,7 +29,7 @@ type Querier interface {
// Series represents a single time series.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() Labels
Labels() labels.Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
@ -75,7 +57,7 @@ func (db *DB) Querier(mint, maxt int64) Querier {
return q
}
func (q *querier) Select(ms ...Matcher) SeriesSet {
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every shard and simply
// return their union.
r := &mergedSeriesSet{}
@ -134,7 +116,7 @@ func mergeStrings(a, b []string) []string {
return res
}
func (q *querier) LabelValuesFor(string, Label) ([]string, error) {
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
@ -180,11 +162,11 @@ func (q *shardQuerier) LabelValues(n string) ([]string, error) {
return res, nil
}
func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) {
func (q *shardQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *shardQuerier) Select(ms ...Matcher) SeriesSet {
func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
@ -221,7 +203,7 @@ func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQue
}
}
func (q *blockQuerier) Select(ms ...Matcher) SeriesSet {
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
var its []Postings
for _, m := range ms {
its = append(its, q.selectSingle(m))
@ -235,7 +217,7 @@ func (q *blockQuerier) Select(ms ...Matcher) SeriesSet {
}
}
func (q *blockQuerier) selectSingle(m Matcher) Postings {
func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {
tpls, err := q.index.LabelValues(m.Name())
if err != nil {
return errPostings{err: err}
@ -249,7 +231,7 @@ func (q *blockQuerier) selectSingle(m Matcher) Postings {
if err != nil {
return errPostings{err: err}
}
if m.Match(vals[0]) {
if m.Matches(vals[0]) {
res = append(res, vals[0])
}
}
@ -295,7 +277,7 @@ func (q *blockQuerier) LabelValues(name string) ([]string, error) {
return res, nil
}
func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) {
func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
@ -360,7 +342,7 @@ func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet {
// compareLabels compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func compareLabels(a, b Labels) int {
func compareLabels(a, b labels.Labels) int {
l := len(a)
if len(b) < l {
l = len(b)
@ -475,7 +457,7 @@ func (s *blockSeriesSet) Err() error { return s.err }
// chunkSeries is a series that is backed by a sequence of chunks holding
// time series data.
type chunkSeries struct {
labels Labels
labels labels.Labels
chunks []ChunkMeta // in-order chunk refs
// chunk is a function that retrieves chunks based on a reference
@ -483,7 +465,7 @@ type chunkSeries struct {
chunk func(ref uint32) (chunks.Chunk, error)
}
func (s *chunkSeries) Labels() Labels {
func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}
@ -525,7 +507,7 @@ type chainedSeries struct {
series []Series
}
func (s *chainedSeries) Labels() Labels {
func (s *chainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}

View file

@ -6,6 +6,7 @@ import (
"strings"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
"github.com/pkg/errors"
)
@ -269,7 +270,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
//
// The references are expected to be sorted and match the order of
// the underlying strings.
labels := make(Labels, 0, k)
lbls := make(labels.Labels, 0, k)
for i := 0; i < len(offsets); i += 2 {
n, err := r.lookupSymbol(offsets[i])
@ -280,7 +281,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
if err != nil {
return nil, errors.Wrap(err, "symbol lookup")
}
labels = append(labels, Label{
lbls = append(lbls, labels.Label{
Name: string(n),
Value: string(v),
})
@ -337,7 +338,7 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
}
return &chunkSeries{
labels: labels,
labels: lbls,
chunks: chunks,
chunk: r.series.Chunk,
}, nil

View file

@ -10,12 +10,15 @@ import (
func BenchmarkLabelMapAccess(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
"prometheus": "prometheus-core-1",
"datacenter": "eu-west-1",
"pod_name": "abcdef-99999-defee",
}
var v string
@ -33,12 +36,15 @@ func BenchmarkLabelMapAccess(b *testing.B) {
func BenchmarkLabelSetAccess(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
"prometheus": "prometheus-core-1",
"datacenter": "eu-west-1",
"pod_name": "abcdef-99999-defee",
}
ls := tsdb.LabelsFromMap(m)

View file

@ -10,6 +10,7 @@ import (
"strings"
"github.com/bradfitz/slice"
"github.com/fabxc/tsdb/labels"
"github.com/pkg/errors"
)
@ -26,7 +27,7 @@ type SeriesWriter interface {
// WriteSeries writes the time series data chunks for a single series.
// The reference is used to resolve the correct series in the written index.
// It only has to be valid for the duration of the write.
WriteSeries(ref uint32, l Labels, cds []*chunkDesc) error
WriteSeries(ref uint32, l labels.Labels, cds []*chunkDesc) error
// Size returns the size of the data written so far.
Size() int64
@ -71,7 +72,7 @@ func (w *seriesWriter) writeMeta() error {
return w.write(w.w, b[:])
}
func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) error {
func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunkDesc) error {
// Initialize with meta data.
if w.n == 0 {
if err := w.writeMeta(); err != nil {
@ -156,7 +157,7 @@ type IndexWriter interface {
// of chunks that the index can reference.
// The reference number is used to resolve a series against the postings
// list iterator. It only has to be available during the write processing.
AddSeries(ref uint32, l Labels, chunks ...ChunkMeta)
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta)
// WriteStats writes final stats for the indexed block.
WriteStats(BlockStats) error
@ -177,7 +178,7 @@ type IndexWriter interface {
}
type indexWriterSeries struct {
labels Labels
labels labels.Labels
chunks []ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
}
@ -240,7 +241,7 @@ func (w *indexWriter) writeMeta() error {
return w.write(w.w, b[:])
}
func (w *indexWriter) AddSeries(ref uint32, lset Labels, chunks ...ChunkMeta) {
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) {
// Populate the symbol table from all label sets we have to reference.
for _, l := range lset {
w.symbols[l.Name] = 0