prometheus/storage/metric/tiered/memory.go
Bjoern Rabenstein de9a88b964 Ensure temporal order in streams.
BenchmarkAppendSample.* before this change:

BenchmarkAppendSample1   1000000              1142 ns/op
--- BENCH: BenchmarkAppendSample1
        memory_test.go:81: 1 cycles with 9992.000000 bytes per cycle, totalling 9992
        memory_test.go:81: 100 cycles with 250.399994 bytes per cycle, totalling 25040
        memory_test.go:81: 10000 cycles with 239.428802 bytes per cycle, totalling 2394288
        memory_test.go:81: 1000000 cycles with 255.504684 bytes per cycle, totalling 255504688
BenchmarkAppendSample10   500000              3823 ns/op
--- BENCH: BenchmarkAppendSample10
        memory_test.go:81: 1 cycles with 15536.000000 bytes per cycle, totalling 15536
        memory_test.go:81: 100 cycles with 662.239990 bytes per cycle, totalling 66224
        memory_test.go:81: 10000 cycles with 601.937622 bytes per cycle, totalling 6019376
        memory_test.go:81: 500000 cycles with 598.582764 bytes per cycle, totalling 299291408
BenchmarkAppendSample100           50000             41111 ns/op
--- BENCH: BenchmarkAppendSample100
        memory_test.go:81: 1 cycles with 79824.000000 bytes per cycle, totalling 79824
        memory_test.go:81: 100 cycles with 4924.479980 bytes per cycle, totalling 492448
        memory_test.go:81: 10000 cycles with 4278.019043 bytes per cycle, totalling 42780192
        memory_test.go:81: 50000 cycles with 4275.242676 bytes per cycle, totalling 213762144
BenchmarkAppendSample1000           5000            533933 ns/op
--- BENCH: BenchmarkAppendSample1000
        memory_test.go:81: 1 cycles with 840224.000000 bytes per cycle, totalling 840224
        memory_test.go:81: 100 cycles with 62789.281250 bytes per cycle, totalling 6278928
        memory_test.go:81: 5000 cycles with 55208.601562 bytes per cycle, totalling 276043008
ok      github.com/prometheus/prometheus/storage/metric/tiered  27.828s

BenchmarkAppendSample.* after this change:

BenchmarkAppendSample1   1000000              1109 ns/op
--- BENCH: BenchmarkAppendSample1
        memory_test.go:131: 1 cycles with 9992.000000 bytes per cycle, totalling 9992
        memory_test.go:131: 100 cycles with 250.399994 bytes per cycle, totalling 25040
        memory_test.go:131: 10000 cycles with 239.220795 bytes per cycle, totalling 2392208
        memory_test.go:131: 1000000 cycles with 255.492630 bytes per cycle, totalling 255492624
BenchmarkAppendSample10   500000              3663 ns/op
--- BENCH: BenchmarkAppendSample10
        memory_test.go:131: 1 cycles with 15536.000000 bytes per cycle, totalling 15536
        memory_test.go:131: 100 cycles with 662.239990 bytes per cycle, totalling 66224
        memory_test.go:131: 10000 cycles with 601.889587 bytes per cycle, totalling 6018896
        memory_test.go:131: 500000 cycles with 598.550903 bytes per cycle, totalling 299275472
BenchmarkAppendSample100           50000             40694 ns/op
--- BENCH: BenchmarkAppendSample100
        memory_test.go:131: 1 cycles with 78976.000000 bytes per cycle, totalling 78976
        memory_test.go:131: 100 cycles with 4928.319824 bytes per cycle, totalling 492832
        memory_test.go:131: 10000 cycles with 4277.961426 bytes per cycle, totalling 42779616
        memory_test.go:131: 50000 cycles with 4275.054199 bytes per cycle, totalling 213752720
BenchmarkAppendSample1000           5000            530744 ns/op
--- BENCH: BenchmarkAppendSample1000
        memory_test.go:131: 1 cycles with 842192.000000 bytes per cycle, totalling 842192
        memory_test.go:131: 100 cycles with 62765.441406 bytes per cycle, totalling 6276544
        memory_test.go:131: 5000 cycles with 55209.812500 bytes per cycle, totalling 276049056
ok      github.com/prometheus/prometheus/storage/metric/tiered  27.468s

Change-Id: Idaa339cd83539b5e4391614541a2c3a04002d66d
2014-04-22 15:22:54 +02:00

562 lines
13 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tiered
import (
"sort"
"sync"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
)
// An initialSeriesArenaSize of 4*60 allows for one hour's worth of storage per
// metric without any major reallocations - assuming a sample rate of 1 / 15Hz.
const initialSeriesArenaSize = 4 * 60
type stream interface {
add(metric.Values)
clone() metric.Values
expunge(age clientmodel.Timestamp) metric.Values
size() int
clear()
metric() clientmodel.Metric
getValueAtTime(t clientmodel.Timestamp) metric.Values
getBoundaryValues(in metric.Interval) metric.Values
getRangeValues(in metric.Interval) metric.Values
}
type arrayStream struct {
sync.RWMutex
m clientmodel.Metric
values metric.Values
}
func (s *arrayStream) metric() clientmodel.Metric {
return s.m
}
// add implements the stream interface. This implementation requires both
// s.values and the passed in v to be sorted already. Values in v that have a
// timestamp older than the most recent value in s.values are skipped.
func (s *arrayStream) add(v metric.Values) {
s.Lock()
defer s.Unlock()
// Skip over values that are older than the most recent value in s.
if len(s.values) > 0 {
i := 0
mostRecentTimestamp := s.values[len(s.values)-1].Timestamp
for ; i < len(v) && mostRecentTimestamp > v[i].Timestamp; i++ {
}
if i > 0 {
glog.Warningf(
"Skipped out-of-order values while adding to %#v: %#v",
s.m, v[:i],
)
v = v[i:]
}
}
s.values = append(s.values, v...)
}
func (s *arrayStream) clone() metric.Values {
s.RLock()
defer s.RUnlock()
clone := make(metric.Values, len(s.values))
copy(clone, s.values)
return clone
}
func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values {
s.Lock()
defer s.Unlock()
finder := func(i int) bool {
return s.values[i].Timestamp.After(t)
}
i := sort.Search(len(s.values), finder)
expunged := s.values[:i]
s.values = s.values[i:]
return expunged
}
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
// BUG(all): May be avenues for simplification.
l := len(s.values)
switch l {
case 0:
return metric.Values{}
case 1:
return metric.Values{s.values[0]}
default:
index := sort.Search(l, func(i int) bool {
return !s.values[i].Timestamp.Before(t)
})
if index == 0 {
return metric.Values{s.values[0]}
}
if index == l {
return metric.Values{s.values[l-1]}
}
if s.values[index].Timestamp.Equal(t) {
return metric.Values{s.values[index]}
}
return metric.Values{s.values[index-1], s.values[index]}
}
}
func (s *arrayStream) getBoundaryValues(in metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
oldest := sort.Search(len(s.values), func(i int) bool {
return !s.values[i].Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(len(s.values), func(i int) bool {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
resultRange := s.values[oldest:newest]
switch len(resultRange) {
case 0:
return metric.Values{}
case 1:
return metric.Values{resultRange[0]}
default:
return metric.Values{resultRange[0], resultRange[len(resultRange)-1]}
}
}
func (s *arrayStream) getRangeValues(in metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
oldest := sort.Search(len(s.values), func(i int) bool {
return !s.values[i].Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(len(s.values), func(i int) bool {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
result := make(metric.Values, newest-oldest)
copy(result, s.values[oldest:newest])
return result
}
func (s *arrayStream) size() int {
return len(s.values)
}
func (s *arrayStream) clear() {
s.values = metric.Values{}
}
func newArrayStream(m clientmodel.Metric) *arrayStream {
return &arrayStream{
m: m,
values: make(metric.Values, 0, initialSeriesArenaSize),
}
}
type memorySeriesStorage struct {
sync.RWMutex
wmCache *watermarkCache
fingerprintToSeries map[clientmodel.Fingerprint]stream
labelPairToFingerprints map[metric.LabelPair]utility.Set
labelNameToLabelValues map[clientmodel.LabelName]utility.Set
}
// MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create
// a memory series storage.
type MemorySeriesOptions struct {
// If provided, this WatermarkCache will be updated for any samples that
// are appended to the memorySeriesStorage.
WatermarkCache *watermarkCache
}
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) error {
for _, sample := range samples {
s.AppendSample(sample)
}
return nil
}
func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error {
s.Lock()
defer s.Unlock()
fingerprint := &clientmodel.Fingerprint{}
fingerprint.LoadFromMetric(sample.Metric)
series := s.getOrCreateSeries(sample.Metric, fingerprint)
series.add(metric.Values{
metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
},
})
if s.wmCache != nil {
s.wmCache.Put(fingerprint, &watermarks{High: sample.Timestamp})
}
return nil
}
func (s *memorySeriesStorage) CreateEmptySeries(metric clientmodel.Metric) {
s.Lock()
defer s.Unlock()
m := clientmodel.Metric{}
for label, value := range metric {
m[label] = value
}
fingerprint := &clientmodel.Fingerprint{}
fingerprint.LoadFromMetric(m)
s.getOrCreateSeries(m, fingerprint)
}
func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *clientmodel.Fingerprint) stream {
series, ok := s.fingerprintToSeries[*fp]
if !ok {
series = newArrayStream(m)
s.fingerprintToSeries[*fp] = series
for k, v := range m {
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
fps, ok := s.labelPairToFingerprints[labelPair]
if !ok {
fps = utility.Set{}
s.labelPairToFingerprints[labelPair] = fps
}
fps.Add(*fp)
values, ok := s.labelNameToLabelValues[k]
if !ok {
values = utility.Set{}
s.labelNameToLabelValues[k] = values
}
values.Add(v)
}
}
return series
}
func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) {
emptySeries := []clientmodel.Fingerprint{}
s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
toArchive := stream.expunge(flushOlderThan)
queued := make(clientmodel.Samples, 0, len(toArchive))
// NOTE: This duplication will go away soon.
for _, value := range toArchive {
queued = append(queued, &clientmodel.Sample{
Metric: stream.metric(),
Timestamp: value.Timestamp,
Value: value.Value,
})
}
// BUG(all): this can deadlock if the queue is full, as we only ever clear
// the queue after calling this method:
// https://github.com/prometheus/prometheus/issues/275
if len(queued) > 0 {
queue <- queued
}
if stream.size() == 0 {
emptySeries = append(emptySeries, fingerprint)
}
}
s.RUnlock()
for _, fingerprint := range emptySeries {
if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 {
s.Lock()
s.dropSeries(&fingerprint)
s.Unlock()
}
}
}
// Drop a label value from the label names to label values index.
func (s *memorySeriesStorage) dropLabelValue(l clientmodel.LabelName, v clientmodel.LabelValue) {
if set, ok := s.labelNameToLabelValues[l]; ok {
set.Remove(v)
if len(set) == 0 {
delete(s.labelNameToLabelValues, l)
}
}
}
// Drop all references to a series, including any samples.
func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
series, ok := s.fingerprintToSeries[*fingerprint]
if !ok {
return
}
for k, v := range series.metric() {
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
if set, ok := s.labelPairToFingerprints[labelPair]; ok {
set.Remove(*fingerprint)
if len(set) == 0 {
delete(s.labelPairToFingerprints, labelPair)
s.dropLabelValue(k, v)
}
}
}
delete(s.fingerprintToSeries, *fingerprint)
}
// Append raw samples, bypassing indexing. Only used to add data to views,
// which don't need to lookup by metric.
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmodel.Fingerprint, samples metric.Values) {
s.Lock()
defer s.Unlock()
series, ok := s.fingerprintToSeries[*fingerprint]
if !ok {
series = newArrayStream(clientmodel.Metric{})
s.fingerprintToSeries[*fingerprint] = series
}
series.add(samples)
}
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) (clientmodel.Fingerprints, error) {
s.RLock()
defer s.RUnlock()
sets := []utility.Set{}
for _, matcher := range labelMatchers {
switch matcher.Type {
case metric.Equal:
set, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
}]
if !ok {
return nil, nil
}
sets = append(sets, set)
default:
values, err := s.getLabelValuesForLabelName(matcher.Name)
if err != nil {
return nil, err
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil, nil
}
set := utility.Set{}
for _, v := range matches {
subset, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: v,
}]
if !ok {
return nil, nil
}
set = set.Union(subset)
}
sets = append(sets, set)
}
}
setCount := len(sets)
if setCount == 0 {
return nil, nil
}
base := sets[0]
for i := 1; i < setCount; i++ {
base = base.Intersection(sets[i])
}
fingerprints := clientmodel.Fingerprints{}
for _, e := range base.Elements() {
fingerprint := e.(clientmodel.Fingerprint)
fingerprints = append(fingerprints, &fingerprint)
}
return fingerprints, nil
}
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
s.RLock()
defer s.RUnlock()
return s.getLabelValuesForLabelName(labelName)
}
func (s *memorySeriesStorage) getLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
set, ok := s.labelNameToLabelValues[labelName]
if !ok {
return nil, nil
}
values := make(clientmodel.LabelValues, 0, len(set))
for e := range set {
val := e.(clientmodel.LabelValue)
values = append(values, val)
}
return values, nil
}
func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f]
if !ok {
return nil, nil
}
metric := clientmodel.Metric{}
for label, value := range series.metric() {
metric[label] = value
}
return metric, nil
}
func (s *memorySeriesStorage) HasFingerprint(f *clientmodel.Fingerprint) bool {
s.RLock()
defer s.RUnlock()
_, has := s.fingerprintToSeries[*f]
return has
}
func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) metric.Values {
s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f]
if !ok {
return nil
}
return series.clone()
}
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f]
if !ok {
return nil
}
return series.getValueAtTime(t)
}
func (s *memorySeriesStorage) GetBoundaryValues(f *clientmodel.Fingerprint, i metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f]
if !ok {
return nil
}
return series.getBoundaryValues(i)
}
func (s *memorySeriesStorage) GetRangeValues(f *clientmodel.Fingerprint, i metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f]
if !ok {
return nil
}
return series.getRangeValues(i)
}
func (s *memorySeriesStorage) Close() {
s.Lock()
defer s.Unlock()
s.fingerprintToSeries = nil
s.labelPairToFingerprints = nil
s.labelNameToLabelValues = nil
}
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
s.RLock()
defer s.RUnlock()
valueSet := map[clientmodel.LabelValue]bool{}
for _, series := range s.fingerprintToSeries {
if value, ok := series.metric()[labelName]; ok {
if !valueSet[value] {
values = append(values, value)
valueSet[value] = true
}
}
}
return
}
// NewMemorySeriesStorage returns a memory series storage ready to use.
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),
labelPairToFingerprints: make(map[metric.LabelPair]utility.Set),
labelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set),
wmCache: o.WatermarkCache,
}
}