Separate storage implementation from interfaces.

This was initially motivated by wanting to distribute the rule checker
tool under `tools/rule_checker`. However, this was not possible without
also distributing the LevelDB dynamic libraries because the tool
transitively depended on Levigo:

rule checker -> query layer -> tiered storage layer -> leveldb

This change separates external storage interfaces from the
implementation (tiered storage, leveldb storage, memory storage) by
putting them into separate packages:

- storage/metric: public, implementation-agnostic interfaces
- storage/metric/tiered: tiered storage implementation, including memory
                         and LevelDB storage.

I initially also considered splitting up the implementation into
separate packages for tiered storage, memory storage, and LevelDB
storage, but these are currently so intertwined that it would be another
major project in itself.

The query layers and most other parts of Prometheus now have notion of
the storage implementation anymore and just use whatever implementation
they get passed in via interfaces.

The rule_checker is now a static binary :)

Change-Id: I793bbf631a8648ca31790e7e772ecf9c2b92f7a0
This commit is contained in:
Julius Volz 2014-04-11 09:27:05 +02:00
parent 3e969a8ca2
commit 01f652cb4c
47 changed files with 748 additions and 678 deletions

16
main.go
View file

@ -30,7 +30,7 @@ import (
"github.com/prometheus/prometheus/notification"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/web"
@ -85,10 +85,10 @@ type prometheus struct {
ruleManager rules.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage *metric.TieredStorage
storage *tiered.TieredStorage
remoteTSDBQueue *remote.TSDBQueueManager
curationState metric.CurationStateUpdater
curationState tiered.CurationStateUpdater
closeOnce sync.Once
}
@ -124,13 +124,13 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
return nil
}
processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{
processor := tiered.NewCompactionProcessor(&tiered.CompactionProcessorOptions{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
})
defer processor.Close()
curator := metric.NewCurator(&metric.CuratorOptions{
curator := tiered.NewCurator(&tiered.CuratorOptions{
Stop: p.stopBackgroundOperations,
ViewQueue: p.storage.ViewQueue,
@ -158,12 +158,12 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
return nil
}
processor := metric.NewDeletionProcessor(&metric.DeletionProcessorOptions{
processor := tiered.NewDeletionProcessor(&tiered.DeletionProcessorOptions{
MaximumMutationPoolBatch: batchSize,
})
defer processor.Close()
curator := metric.NewCurator(&metric.CuratorOptions{
curator := tiered.NewCurator(&tiered.CuratorOptions{
Stop: p.stopBackgroundOperations,
ViewQueue: p.storage.ViewQueue,
@ -240,7 +240,7 @@ func main() {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, *arenaFlushInterval, *arenaTTL, *metricsStoragePath)
ts, err := tiered.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, *arenaFlushInterval, *arenaTTL, *metricsStoragePath)
if err != nil {
glog.Fatal("Error opening storage: ", err)
}

View file

@ -118,11 +118,11 @@ func (rule *AlertingRule) Name() string {
return rule.name
}
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
}
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
// Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp, storage)
if err != nil {

View file

@ -357,7 +357,7 @@ func labelsToKey(labels clientmodel.Metric) uint64 {
}
// EvalVectorInstant evaluates a VectorNode with an instant query.
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (vector Vector, err error) {
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (vector Vector, err error) {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
if err != nil {
return
@ -367,7 +367,7 @@ func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage
}
// EvalVectorRange evaluates a VectorNode with a range query.
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (Matrix, error) {
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (Matrix, error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON.
matrix := Matrix{}

View file

@ -150,7 +150,7 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
}
// EvalToString evaluates the given node into a string of the given format.
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage *metric.TieredStorage, queryStats *stats.TimerGroup) string {
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) string {
viewTimer := queryStats.GetTimer(stats.TotalViewBuildingTime).Start()
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
viewTimer.Stop()

View file

@ -48,13 +48,13 @@ type QueryAnalyzer struct {
IntervalRanges IntervalRangeMap
// The underlying storage to which the query will be applied. Needed for
// extracting timeseries fingerprint information during query analysis.
storage *metric.TieredStorage
storage metric.Persistence
}
// NewQueryAnalyzer returns a pointer to a newly instantiated
// QueryAnalyzer. The storage is needed to extract timeseries
// fingerprint information during query analysis.
func NewQueryAnalyzer(storage *metric.TieredStorage) *QueryAnalyzer {
func NewQueryAnalyzer(storage metric.Persistence) *QueryAnalyzer {
return &QueryAnalyzer{
FullRanges: FullRangeMap{},
IntervalRanges: IntervalRangeMap{},
@ -104,14 +104,14 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
Walk(analyzer, node)
}
func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
viewBuilder := storage.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
viewBuilder.GetMetricRange(&fingerprint, timestamp.Add(-rangeDuration), timestamp)
}
@ -121,8 +121,7 @@ func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, stor
requestBuildTimer.Stop()
buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
// BUG(julius): Clear Law of Demeter violation.
view, err := analyzer.storage.MakeView(viewBuilder, 60*time.Second, queryStats)
view, err := viewBuilder.Execute(60*time.Second, queryStats)
buildTimer.Stop()
if err != nil {
return nil, err
@ -130,14 +129,14 @@ func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, stor
return NewViewAdapter(view, storage, queryStats), nil
}
func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()
requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
viewBuilder := storage.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
if interval < rangeDuration {
viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end)
@ -151,7 +150,7 @@ func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end client
requestBuildTimer.Stop()
buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats)
view, err := viewBuilder.Execute(time.Duration(60)*time.Second, queryStats)
buildTimer.Stop()
if err != nil {
return nil, err

View file

@ -39,7 +39,7 @@ type viewAdapter struct {
stalenessPolicy StalenessPolicy
// AST-global storage to use for operations that are not supported by views
// (i.e. fingerprint->metric lookups).
storage *metric.TieredStorage
storage metric.Persistence
// The materialized view which contains all timeseries data required for
// executing a query.
view metric.View
@ -184,7 +184,7 @@ func (v *viewAdapter) GetRangeValues(fingerprints clientmodel.Fingerprints, inte
// NewViewAdapter returns an initialized view adapter with a default
// staleness policy (based on the --defaultStalenessDelta flag).
func NewViewAdapter(view metric.View, storage *metric.TieredStorage, queryStats *stats.TimerGroup) *viewAdapter {
func NewViewAdapter(view metric.View, storage metric.Persistence, queryStats *stats.TimerGroup) *viewAdapter {
stalenessPolicy := StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
}

View file

@ -51,7 +51,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
return vector
}
func storeMatrix(storage metric.TieredStorage, matrix ast.Matrix) (err error) {
func storeMatrix(storage metric.Persistence, matrix ast.Matrix) (err error) {
pendingSamples := clientmodel.Samples{}
for _, sampleSet := range matrix {
for _, sample := range sampleSet.Values {

View file

@ -49,7 +49,7 @@ type ruleManager struct {
done chan bool
interval time.Duration
storage *metric.TieredStorage
storage metric.PreloadingPersistence
results chan<- *extraction.Result
notifications chan<- notification.NotificationReqs
@ -59,7 +59,7 @@ type ruleManager struct {
type RuleManagerOptions struct {
EvaluationInterval time.Duration
Storage *metric.TieredStorage
Storage metric.PreloadingPersistence
Notifications chan<- notification.NotificationReqs
Results chan<- *extraction.Result

View file

@ -34,11 +34,11 @@ type RecordingRule struct {
func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
}
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
// Get the raw value of the rule expression.
vector, err := rule.EvalRaw(timestamp, storage)
if err != nil {

View file

@ -29,9 +29,9 @@ type Rule interface {
Name() string
// EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting.
EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error)
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error)
// ToDotGraph returns a Graphviz dot graph of the rule.
ToDotGraph() string
// String returns a human-readable string representation of the rule.

View file

@ -24,7 +24,7 @@ import (
"github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
"github.com/prometheus/prometheus/utility/test"
)
@ -53,7 +53,7 @@ func vectorComparisonString(expected []string, actual []string) string {
}
type testTieredStorageCloser struct {
storage *metric.TieredStorage
storage *tiered.TieredStorage
directory test.Closer
}
@ -64,10 +64,10 @@ func (t testTieredStorageCloser) Close() {
// This is copied from storage/metric/helpers_test.go, which is unfortunate but
// presently required to make things work.
func NewTestTieredStorage(t test.Tester) (storage *metric.TieredStorage, closer test.Closer) {
func NewTestTieredStorage(t test.Tester) (storage *tiered.TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := metric.NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path())
storage, err := tiered.NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path())
if err != nil {
if storage != nil {
@ -91,12 +91,12 @@ func NewTestTieredStorage(t test.Tester) (storage *metric.TieredStorage, closer
return
}
func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.Closer) {
func newTestStorage(t test.Tester) (storage *tiered.TieredStorage, closer test.Closer) {
storage, closer = NewTestTieredStorage(t)
if storage == nil {
t.Fatal("storage == nil")
}
storeMatrix(*storage, testMatrix)
storeMatrix(storage, testMatrix)
return
}

View file

@ -1 +0,0 @@
command-line-arguments.test

View file

@ -13,14 +13,17 @@
package metric
import clientmodel "github.com/prometheus/client_golang/model"
import (
"time"
// AppendBatch models a batch of samples to be stored.
type AppendBatch map[clientmodel.Fingerprint]SampleSet
clientmodel "github.com/prometheus/client_golang/model"
// MetricPersistence is a system for storing metric samples in a persistence
"github.com/prometheus/prometheus/stats"
)
// Persistence is a system for storing metric samples in a persistence
// layer.
type MetricPersistence interface {
type Persistence interface {
// A storage system may rely on external resources and thusly should be
// closed when finished.
Close()
@ -42,6 +45,15 @@ type MetricPersistence interface {
GetAllValuesForLabel(clientmodel.LabelName) (clientmodel.LabelValues, error)
}
// PreloadingPersistence is a Persistence which supports building
// preloaded views.
type PreloadingPersistence interface {
Persistence
// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what
// types of queries to perform.
NewViewRequestBuilder() ViewRequestBuilder
}
// View provides a view of the values in the datastore subject to the request
// of a preloading operation.
type View interface {
@ -55,9 +67,73 @@ type View interface {
GetRangeValues(*clientmodel.Fingerprint, Interval) Values
}
// ViewableMetricPersistence is a MetricPersistence that is able to present the
// ViewablePersistence is a Persistence that is able to present the
// samples it has stored as a View.
type ViewableMetricPersistence interface {
MetricPersistence
type ViewablePersistence interface {
Persistence
View
}
// ViewRequestBuilder represents the summation of all datastore queries that
// shall be performed to extract values. Call the Get... methods to record the
// queries. Once done, use HasOp and PopOp to retrieve the resulting
// operations. The operations are sorted by their fingerprint (and, for equal
// fingerprints, by the StartsAt timestamp of their operation).
type ViewRequestBuilder interface {
// GetMetricAtTime records a query to get, for the given Fingerprint,
// either the value at that time if there is a match or the one or two
// values adjacent thereto.
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
// GetMetricAtInterval records a query to get, for the given
// Fingerprint, either the value at that interval from From through
// Through if there is a match or the one or two values adjacent for
// each point.
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
// GetMetricRange records a query to get, for the given Fingerprint, the
// values that occur inclusively from From through Through.
GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp)
// GetMetricRangeAtInterval records a query to get value ranges at
// intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration)
// Execute materializes a View, subject to a timeout.
Execute(deadline time.Duration, queryStats *stats.TimerGroup) (View, error)
// PopOp emits the next operation in the queue (sorted by
// fingerprint). If called while HasOps returns false, the
// behavior is undefined.
PopOp() Op
// HasOp returns true if there is at least one more operation in the
// queue.
HasOp() bool
}
// Op encapsulates a primitive query operation.
type Op interface {
// Fingerprint returns the fingerprint of the metric this operation
// operates on.
Fingerprint() *clientmodel.Fingerprint
// ExtractSamples extracts samples from a stream of values and advances
// the operation time.
ExtractSamples(Values) Values
// Consumed returns whether the operator has consumed all data it needs.
Consumed() bool
// CurrentTime gets the current operation time. In a newly created op,
// this is the starting time of the operation. During ongoing execution
// of the op, the current time is advanced accordingly. Once no
// subsequent work associated with the operation remains, nil is
// returned.
CurrentTime() clientmodel.Timestamp
}
// CurationState contains high-level curation state information for the
// heads-up-display.
type CurationState struct {
Active bool
Name string
Limit time.Duration
Fingerprint *clientmodel.Fingerprint
}

View file

@ -98,15 +98,3 @@ func (m *LabelMatcher) Filter(in clientmodel.LabelValues) clientmodel.LabelValue
}
return out
}
func labelMatchersFromLabelSet(l clientmodel.LabelSet) LabelMatchers {
m := make(LabelMatchers, 0, len(l))
for k, v := range l {
m = append(m, &LabelMatcher{
Type: Equal,
Name: k,
Value: v,
})
}
return m
}

View file

@ -15,23 +15,12 @@ package metric
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sort"
clientmodel "github.com/prometheus/client_golang/model"
)
const (
// sampleSize is the number of bytes per sample in marshalled format.
sampleSize = 16
// formatVersion is used as a version marker in the marshalled format.
formatVersion = 1
// formatVersionSize is the number of bytes used by the serialized formatVersion.
formatVersionSize = 1
)
// MarshalJSON implements json.Marshaler.
func (s SamplePair) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil
@ -121,7 +110,7 @@ func (v Values) InsideInterval(t clientmodel.Timestamp) bool {
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated
// dropped. The original slice is not mutated.
func (v Values) TruncateBefore(t clientmodel.Timestamp) Values {
index := sort.Search(len(v), func(i int) bool {
timestamp := v[i].Timestamp
@ -132,16 +121,6 @@ func (v Values) TruncateBefore(t clientmodel.Timestamp) Values {
return v[index:]
}
// ToSampleKey returns the SampleKey for these Values.
func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey {
return &SampleKey{
Fingerprint: f,
FirstTimestamp: v[0].Timestamp,
LastTimestamp: v[len(v)-1].Timestamp,
SampleCount: uint32(len(v)),
}
}
func (v Values) String() string {
buffer := bytes.Buffer{}
@ -157,48 +136,6 @@ func (v Values) String() string {
return buffer.String()
}
// marshal marshals a group of samples for being written to disk into dest or a
// new slice if dest is insufficiently small.
func (v Values) marshal(dest []byte) []byte {
sz := formatVersionSize + len(v)*sampleSize
if cap(dest) < sz {
dest = make([]byte, sz)
} else {
dest = dest[0:sz]
}
dest[0] = formatVersion
for i, val := range v {
offset := formatVersionSize + i*sampleSize
binary.LittleEndian.PutUint64(dest[offset:], uint64(val.Timestamp.Unix()))
binary.LittleEndian.PutUint64(dest[offset+8:], math.Float64bits(float64(val.Value)))
}
return dest
}
// unmarshalValues decodes marshalled samples into dest and returns either dest
// or a new slice containing those values if dest is insufficiently small.
func unmarshalValues(buf []byte, dest Values) Values {
if buf[0] != formatVersion {
panic("unsupported format version")
}
n := (len(buf) - formatVersionSize) / sampleSize
if cap(dest) < n {
dest = make(Values, n)
} else {
dest = dest[0:n]
}
for i := 0; i < n; i++ {
offset := formatVersionSize + i*sampleSize
dest[i].Timestamp = clientmodel.TimestampFromUnix(int64(binary.LittleEndian.Uint64(buf[offset:])))
dest[i].Value = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(buf[offset+8:])))
}
return dest
}
// SampleSet is Values with a Metric attached.
type SampleSet struct {
Metric clientmodel.Metric

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"flag"
@ -21,13 +21,14 @@ import (
"time"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
clientmodel "github.com/prometheus/client_golang/model"
)
type nopCurationStateUpdater struct{}
func (n *nopCurationStateUpdater) UpdateCurationState(*CurationState) {}
func (n *nopCurationStateUpdater) UpdateCurationState(*metric.CurationState) {}
func generateTestSamples(endTime clientmodel.Timestamp, numTs int, samplesPerTs int, interval time.Duration) clientmodel.Samples {
samples := make(clientmodel.Samples, 0, numTs*samplesPerTs)
@ -63,7 +64,7 @@ func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorErr
c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey)
}
fp := &clientmodel.Fingerprint{}
for _, sample := range value.(Values) {
for _, sample := range value.(metric.Values) {
if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) {
c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp)
}
@ -75,7 +76,7 @@ func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorErr
c.t.Fatalf("%d. Expected fingerprint %s, got %s", c.sampleIdx, fp, sampleKey.Fingerprint)
}
sp := &SamplePair{
sp := &metric.SamplePair{
Value: expected.Value,
Timestamp: expected.Timestamp,
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"bytes"
@ -26,6 +26,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
@ -38,16 +39,7 @@ var errIllegalIterator = errors.New("iterator invalid")
// CurationStateUpdater receives updates about the curation state.
type CurationStateUpdater interface {
UpdateCurationState(*CurationState)
}
// CurationState contains high-level curation state information for the
// heads-up-display.
type CurationState struct {
Active bool
Name string
Limit time.Duration
Fingerprint *clientmodel.Fingerprint
UpdateCurationState(*metric.CurationState)
}
// CuratorOptions bundles the parameters needed to create a Curator.
@ -144,7 +136,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
curationDurations.Add(labels, duration)
}(time.Now())
defer status.UpdateCurationState(&CurationState{Active: false})
defer status.UpdateCurationState(&metric.CurationState{Active: false})
iterator, err := samples.NewIterator(true)
if err != nil {
@ -271,7 +263,7 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
curationFilterOperations.Increment(labels)
w.status.UpdateCurationState(&CurationState{
w.status.UpdateCurationState(&metric.CurationState{
Active: true,
Name: w.processor.Name(),
Limit: w.ignoreYoungerThan,

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"sort"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"sort"
@ -20,10 +20,11 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
func GetFingerprintsForLabelSetTests(p metric.Persistence, t test.Tester) {
metrics := []clientmodel.Metric{
{
clientmodel.MetricNameLabel: "test_metric",
@ -47,8 +48,8 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
},
}
newTestLabelMatcher := func(matchType MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *LabelMatcher {
m, err := NewLabelMatcher(matchType, name, value)
newTestLabelMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher {
m, err := metric.NewLabelMatcher(matchType, name, value)
if err != nil {
t.Fatalf("Couldn't create label matcher: %v", err)
}
@ -56,61 +57,61 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
}
scenarios := []struct {
in LabelMatchers
in metric.LabelMatchers
outIndexes []int
}{
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
},
outIndexes: []int{0, 1, 2, 3},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "non_existent_metric"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "non_existent_metric"),
},
outIndexes: []int{},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "non_existent_metric"),
newTestLabelMatcher(Equal, "result", "success"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "non_existent_metric"),
newTestLabelMatcher(metric.Equal, "result", "success"),
},
outIndexes: []int{},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(Equal, "result", "success"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(metric.Equal, "result", "success"),
},
outIndexes: []int{0, 2},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(NotEqual, "result", "success"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(metric.NotEqual, "result", "success"),
},
outIndexes: []int{1, 3},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(RegexMatch, "result", "foo|success|bar"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(metric.RegexMatch, "result", "foo|success|bar"),
},
outIndexes: []int{0, 2},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(RegexNoMatch, "result", "foo|success|bar"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(metric.RegexNoMatch, "result", "foo|success|bar"),
},
outIndexes: []int{1, 3},
},
{
in: LabelMatchers{
newTestLabelMatcher(Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(RegexNoMatch, "result", "foo|success|bar"),
newTestLabelMatcher(RegexMatch, "method", "os"),
in: metric.LabelMatchers{
newTestLabelMatcher(metric.Equal, clientmodel.MetricNameLabel, "test_metric"),
newTestLabelMatcher(metric.RegexNoMatch, "result", "foo|success|bar"),
newTestLabelMatcher(metric.RegexMatch, "method", "os"),
},
outIndexes: []int{3},
},
@ -152,7 +153,7 @@ func GetFingerprintsForLabelSetTests(p MetricPersistence, t test.Tester) {
}
}
func GetLabelValuesForLabelNameTests(p MetricPersistence, t test.Tester) {
func GetLabelValuesForLabelNameTests(p metric.Persistence, t test.Tester) {
testAppendSamples(p, &clientmodel.Sample{
Value: 0,
Timestamp: 0,
@ -196,7 +197,7 @@ func GetLabelValuesForLabelNameTests(p MetricPersistence, t test.Tester) {
}
}
func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
func GetMetricForFingerprintTests(p metric.Persistence, t test.Tester) {
testAppendSamples(p, &clientmodel.Sample{
Value: 0,
Timestamp: 0,
@ -214,8 +215,8 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
},
}, t)
result, err := p.GetFingerprintsForLabelMatchers(LabelMatchers{{
Type: Equal,
result, err := p.GetFingerprintsForLabelMatchers(metric.LabelMatchers{{
Type: metric.Equal,
Name: "request_type",
Value: "your_mom",
}})
@ -228,25 +229,25 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
t.Errorf("Expected one element.")
}
metric, err := p.GetMetricForFingerprint(result[0])
m, err := p.GetMetricForFingerprint(result[0])
if err != nil {
t.Error(err)
}
if metric == nil {
if m == nil {
t.Fatal("Did not expect nil.")
}
if len(metric) != 1 {
if len(m) != 1 {
t.Errorf("Expected one-dimensional metric.")
}
if metric["request_type"] != "your_mom" {
if m["request_type"] != "your_mom" {
t.Errorf("Expected metric to match.")
}
result, err = p.GetFingerprintsForLabelMatchers(LabelMatchers{{
Type: Equal,
result, err = p.GetFingerprintsForLabelMatchers(metric.LabelMatchers{{
Type: metric.Equal,
Name: "request_type",
Value: "your_dad",
}})
@ -259,9 +260,9 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
t.Errorf("Expected one element.")
}
metric, err = p.GetMetricForFingerprint(result[0])
m, err = p.GetMetricForFingerprint(result[0])
if metric == nil {
if m == nil {
t.Fatal("Did not expect nil.")
}
@ -269,24 +270,24 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
t.Error(err)
}
if len(metric) != 2 {
if len(m) != 2 {
t.Errorf("Expected two-dimensional metric.")
}
if metric["request_type"] != "your_dad" {
if m["request_type"] != "your_dad" {
t.Errorf("Expected metric to match.")
}
if metric["one-off"] != "value" {
if m["one-off"] != "value" {
t.Errorf("Expected metric to match.")
}
// Verify that mutating a returned metric does not result in the mutated
// metric to be returned at the next GetMetricForFingerprint() call.
metric["one-off"] = "new value"
metric, err = p.GetMetricForFingerprint(result[0])
m["one-off"] = "new value"
m, err = p.GetMetricForFingerprint(result[0])
if metric == nil {
if m == nil {
t.Fatal("Did not expect nil.")
}
@ -294,21 +295,21 @@ func GetMetricForFingerprintTests(p MetricPersistence, t test.Tester) {
t.Error(err)
}
if len(metric) != 2 {
if len(m) != 2 {
t.Errorf("Expected two-dimensional metric.")
}
if metric["request_type"] != "your_dad" {
if m["request_type"] != "your_dad" {
t.Errorf("Expected metric to match.")
}
if metric["one-off"] != "value" {
if m["one-off"] != "value" {
t.Errorf("Expected metric to match.")
}
}
func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
metric := clientmodel.Metric{
func AppendRepeatingValuesTests(p metric.Persistence, t test.Tester) {
m := clientmodel.Metric{
clientmodel.MetricNameLabel: "errors_total",
"controller": "foo",
"operation": "bar",
@ -323,14 +324,14 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
testAppendSamples(p, &clientmodel.Sample{
Value: clientmodel.SampleValue(i),
Timestamp: time,
Metric: metric,
Metric: m,
}, t)
}
}
v, ok := p.(View)
v, ok := p.(metric.View)
if !ok {
// It's purely a benchmark for a MetricPersistence that is not viewable.
// It's purely a benchmark for a Persistence that is not viewable.
return
}
@ -367,8 +368,8 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
}
}
func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
metric := clientmodel.Metric{
func AppendsRepeatingValuesTests(p metric.Persistence, t test.Tester) {
m := clientmodel.Metric{
clientmodel.MetricNameLabel: "errors_total",
"controller": "foo",
"operation": "bar",
@ -384,14 +385,14 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
s = append(s, &clientmodel.Sample{
Value: clientmodel.SampleValue(i),
Timestamp: time,
Metric: metric,
Metric: m,
})
}
}
p.AppendSamples(s)
v, ok := p.(View)
v, ok := p.(metric.View)
if !ok {
// It's purely a benchmark for a MetricPersistance that is not viewable.
return

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"time"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -19,6 +19,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
@ -28,18 +29,18 @@ var (
testInstant = clientmodel.TimestampFromTime(time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC))
)
func testAppendSamples(p MetricPersistence, s *clientmodel.Sample, t test.Tester) {
func testAppendSamples(p metric.Persistence, s *clientmodel.Sample, t test.Tester) {
err := p.AppendSamples(clientmodel.Samples{s})
if err != nil {
t.Fatal(err)
}
}
func buildLevelDBTestPersistencesMaker(name string, t test.Tester) func() (MetricPersistence, test.Closer) {
return func() (MetricPersistence, test.Closer) {
func buildLevelDBTestPersistencesMaker(name string, t test.Tester) func() (metric.Persistence, test.Closer) {
return func() (metric.Persistence, test.Closer) {
temporaryDirectory := test.NewTemporaryDirectory("get_value_at_time", t)
p, err := NewLevelDBMetricPersistence(temporaryDirectory.Path())
p, err := NewLevelDBPersistence(temporaryDirectory.Path())
if err != nil {
t.Errorf("Could not start up LevelDB: %q\n", err)
}
@ -48,13 +49,13 @@ func buildLevelDBTestPersistencesMaker(name string, t test.Tester) func() (Metri
}
}
func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test.Tester)) func(t test.Tester) {
func buildLevelDBTestPersistence(name string, f func(p metric.Persistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
temporaryDirectory := test.NewTemporaryDirectory(fmt.Sprintf("test_leveldb_%s", name), t)
defer temporaryDirectory.Close()
p, err := NewLevelDBMetricPersistence(temporaryDirectory.Path())
p, err := NewLevelDBPersistence(temporaryDirectory.Path())
if err != nil {
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
@ -66,7 +67,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
}
}
func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func(t test.Tester) {
func buildMemoryTestPersistence(f func(p metric.Persistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
p := NewMemorySeriesStorage(MemorySeriesOptions{})
@ -115,3 +116,15 @@ func NewTestTieredStorage(t test.Tester) (*TieredStorage, test.Closer) {
return storage, closer
}
func labelMatchersFromLabelSet(l clientmodel.LabelSet) metric.LabelMatchers {
m := make(metric.LabelMatchers, 0, len(l))
for k, v := range l {
m = append(m, &metric.LabelMatcher{
Type: metric.Equal,
Name: k,
Value: v,
})
}
return m
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"io"
@ -21,6 +21,7 @@ import (
"code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
@ -182,7 +183,7 @@ func NewLevelDBLabelNameLabelValuesIndex(o leveldb.LevelDBOptions) (*LevelDBLabe
// LabelPairFingerprintMapping is an in-memory map of LabelPairs to
// Fingerprints.
type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelPairFingerprintMapping map[metric.LabelPair]clientmodel.Fingerprints
// LabelPairFingerprintIndex models a database mapping LabelPairs to
// Fingerprints.
@ -192,8 +193,8 @@ type LabelPairFingerprintIndex interface {
raw.Pruner
IndexBatch(LabelPairFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
Lookup(*metric.LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*metric.LabelPair) (ok bool, err error)
}
// LevelDBLabelPairFingerprintIndex implements LabelPairFingerprintIndex using
@ -228,7 +229,7 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp
}
// Lookup implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *metric.LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -254,7 +255,7 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F
}
// Has implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
func (i *LevelDBLabelPairFingerprintIndex) Has(p *metric.LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -615,11 +616,11 @@ func extendLabelNameToLabelValuesIndex(i LabelNameLabelValuesIndex, b Fingerprin
}
func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMapping) (LabelPairFingerprintMapping, error) {
collection := map[LabelPair]utility.Set{}
collection := map[metric.LabelPair]utility.Set{}
for fp, m := range b {
for n, v := range m {
pair := LabelPair{
pair := metric.LabelPair{
Name: n,
Value: v,
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"time"

View file

@ -11,13 +11,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"
"github.com/prometheus/prometheus/storage/metric"
)
func TestInterfaceAdherence(t *testing.T) {
var _ MetricPersistence = &LevelDBMetricPersistence{}
var _ MetricPersistence = NewMemorySeriesStorage(MemorySeriesOptions{})
var _ metric.Persistence = &LevelDBPersistence{}
var _ metric.Persistence = NewMemorySeriesStorage(MemorySeriesOptions{})
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"flag"
@ -26,6 +26,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
@ -35,8 +36,8 @@ import (
const sortConcurrency = 2
// LevelDBMetricPersistence is a leveldb-backed persistence layer for metrics.
type LevelDBMetricPersistence struct {
// LevelDBPersistence is a leveldb-backed persistence layer for metrics.
type LevelDBPersistence struct {
CurationRemarks CurationRemarker
FingerprintToMetrics FingerprintMetricIndex
LabelNameToLabelValues LabelNameLabelValuesIndex
@ -52,7 +53,7 @@ type LevelDBMetricPersistence struct {
//
// type FingerprintResolver interface {
// GetFingerprintForMetric(clientmodel.Metric) (*clientmodel.Fingerprint, bool, error)
// GetFingerprintsForLabelMatchers(LabelPair) (clientmodel.Fingerprints, bool, error)
// GetFingerprintsForLabelMatchers(metric.LabelPair) (clientmodel.Fingerprints, bool, error)
// }
// type MetricResolver interface {
@ -77,8 +78,8 @@ var (
type leveldbOpener func()
// Close closes all the underlying persistence layers. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) Close() {
// Persistence interface.
func (l *LevelDBPersistence) Close() {
var persistences = []raw.Database{
l.CurationRemarks,
l.FingerprintToMetrics,
@ -106,12 +107,12 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup.Wait()
}
// NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready
// NewLevelDBPersistence returns a LevelDBPersistence object ready
// to use.
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
func NewLevelDBPersistence(baseDirectory string) (*LevelDBPersistence, error) {
workers := utility.NewUncertaintyGroup(7)
emission := &LevelDBMetricPersistence{}
emission := &LevelDBPersistence{}
var subsystemOpeners = []struct {
name string
@ -245,8 +246,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
return emission, nil
}
// AppendSample implements the MetricPersistence interface.
func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
// AppendSample implements the Persistence interface.
func (l *LevelDBPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -292,7 +293,7 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint
return fingerprintToSamples
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
func (l *LevelDBPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -312,7 +313,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.
}
// AppendSamples appends the given Samples to the database and indexes them.
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
func (l *LevelDBPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -342,7 +343,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
key := &SampleKey{}
keyDto := &dto.SampleKey{}
values := make(Values, 0, *leveldbChunkSize)
values := make(metric.Values, 0, *leveldbChunkSize)
for fingerprint, group := range fingerprintToSamples {
for {
@ -369,12 +370,12 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
key.Dump(keyDto)
for _, sample := range chunk {
values = append(values, SamplePair{
values = append(values, metric.SamplePair{
Timestamp: sample.Timestamp,
Value: sample.Value,
})
}
val := values.marshal(nil)
val := marshalValues(values, nil)
samplesBatch.PutRaw(keyDto, val)
}
}
@ -409,7 +410,7 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
return key, nil
}
func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) {
func (l *LevelDBPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -422,8 +423,8 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b
// GetFingerprintsForLabelMatchers returns the Fingerprints for the given
// LabelMatchers by querying the underlying LabelPairFingerprintIndex and
// possibly the LabelNameLabelValuesIndex for each matcher. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelMatchers(labelMatchers LabelMatchers) (fps clientmodel.Fingerprints, err error) {
// Persistence interface.
func (l *LevelDBPersistence) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) (fps clientmodel.Fingerprints, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -436,8 +437,8 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelMatchers(labelMatchers
set := utility.Set{}
switch matcher.Type {
case Equal:
fps, _, err := l.LabelPairToFingerprints.Lookup(&LabelPair{
case metric.Equal:
fps, _, err := l.LabelPairToFingerprints.Lookup(&metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
})
@ -458,7 +459,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelMatchers(labelMatchers
return nil, nil
}
for _, v := range matches {
fps, _, err := l.LabelPairToFingerprints.Lookup(&LabelPair{
fps, _, err := l.LabelPairToFingerprints.Lookup(&metric.LabelPair{
Name: matcher.Name,
Value: v,
})
@ -492,8 +493,8 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelMatchers(labelMatchers
// GetLabelValuesForLabelName returns the LabelValues for the given LabelName
// from the underlying LabelNameLabelValuesIndex. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
// Persistence interface.
func (l *LevelDBPersistence) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
var err error
defer func(begin time.Time) {
duration := time.Since(begin)
@ -507,9 +508,9 @@ func (l *LevelDBMetricPersistence) GetLabelValuesForLabelName(labelName clientmo
}
// GetMetricForFingerprint returns the Metric for the given Fingerprint from the
// underlying FingerprintMetricIndex. It implements the MetricPersistence
// underlying FingerprintMetricIndex. It implements the Persistence
// interface.
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
func (l *LevelDBPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -524,7 +525,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
// GetAllValuesForLabel gets all label values that are associated with the
// provided label name.
func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
func (l *LevelDBPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
filter := &LabelNameFilter{
labelName: labelName,
}
@ -543,7 +544,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBMetricPersistence) Prune() {
func (l *LevelDBPersistence) Prune() {
l.CurationRemarks.Prune()
l.FingerprintToMetrics.Prune()
l.LabelNameToLabelValues.Prune()
@ -554,7 +555,7 @@ func (l *LevelDBMetricPersistence) Prune() {
}
// Sizes returns the sum of all sizes of the underlying databases.
func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
func (l *LevelDBPersistence) Sizes() (total uint64, err error) {
size := uint64(0)
if size, err = l.CurationRemarks.Size(); err != nil {
@ -596,7 +597,7 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
}
// States returns the DatabaseStates of all underlying databases.
func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
func (l *LevelDBPersistence) States() raw.DatabaseStates {
return raw.DatabaseStates{
l.CurationRemarks.State(),
l.FingerprintToMetrics.State(),
@ -617,7 +618,7 @@ type CollectLabelValuesOp struct {
// Operate implements storage.RecordOperator. 'key' is required to be a
// LabelPair. Its Value is appended to a slice of collected LabelValues.
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
labelPair := key.(LabelPair)
labelPair := key.(metric.LabelPair)
op.labelValues = append(op.labelValues, labelPair.Value)
return
}
@ -634,7 +635,7 @@ func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error
return
}
out = LabelPair{
out = metric.LabelPair{
Name: clientmodel.LabelName(*unmarshaled.Name),
Value: clientmodel.LabelValue(*unmarshaled.Value),
}
@ -690,7 +691,7 @@ type LabelNameFilter struct {
// LabelPair. The result is ACCEPT if the Name of the LabelPair matches the
// LabelName of this LabelNameFilter.
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
labelPair, ok := key.(LabelPair)
labelPair, ok := key.(metric.LabelPair)
if ok && labelPair.Name == f.labelName {
return storage.Accept
}

View file

@ -0,0 +1,74 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tiered
import (
"encoding/binary"
"math"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
const (
// sampleSize is the number of bytes per sample in marshalled format.
sampleSize = 16
// formatVersion is used as a version marker in the marshalled format.
formatVersion = 1
// formatVersionSize is the number of bytes used by the serialized formatVersion.
formatVersionSize = 1
)
// marshal marshals a group of samples for being written to disk into dest or a
// new slice if dest has insufficient capacity.
func marshalValues(v metric.Values, dest []byte) []byte {
sz := formatVersionSize + len(v)*sampleSize
if cap(dest) < sz {
dest = make([]byte, sz)
} else {
dest = dest[0:sz]
}
dest[0] = formatVersion
for i, val := range v {
offset := formatVersionSize + i*sampleSize
binary.LittleEndian.PutUint64(dest[offset:], uint64(val.Timestamp.Unix()))
binary.LittleEndian.PutUint64(dest[offset+8:], math.Float64bits(float64(val.Value)))
}
return dest
}
// unmarshalValues decodes marshalled samples into dest and returns either dest
// or a new slice containing those values if dest has insufficient capacity.
func unmarshalValues(buf []byte, dest metric.Values) metric.Values {
if buf[0] != formatVersion {
panic("unsupported format version")
}
n := (len(buf) - formatVersionSize) / sampleSize
if cap(dest) < n {
dest = make(metric.Values, n)
} else {
dest = dest[0:n]
}
for i := 0; i < n; i++ {
offset := formatVersionSize + i*sampleSize
dest[i].Timestamp = clientmodel.TimestampFromUnix(int64(binary.LittleEndian.Uint64(buf[offset:])))
dest[i].Value = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(buf[offset+8:])))
}
return dest
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"sort"
@ -19,6 +19,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
)
@ -27,50 +28,50 @@ import (
const initialSeriesArenaSize = 4 * 60
type stream interface {
add(Values)
add(metric.Values)
clone() Values
expunge(age clientmodel.Timestamp) Values
clone() metric.Values
expunge(age clientmodel.Timestamp) metric.Values
size() int
clear()
metric() clientmodel.Metric
getValueAtTime(t clientmodel.Timestamp) Values
getBoundaryValues(in Interval) Values
getRangeValues(in Interval) Values
getValueAtTime(t clientmodel.Timestamp) metric.Values
getBoundaryValues(in metric.Interval) metric.Values
getRangeValues(in metric.Interval) metric.Values
}
type arrayStream struct {
sync.RWMutex
m clientmodel.Metric
values Values
values metric.Values
}
func (s *arrayStream) metric() clientmodel.Metric {
return s.m
}
func (s *arrayStream) add(v Values) {
func (s *arrayStream) add(v metric.Values) {
s.Lock()
defer s.Unlock()
s.values = append(s.values, v...)
}
func (s *arrayStream) clone() Values {
func (s *arrayStream) clone() metric.Values {
s.RLock()
defer s.RUnlock()
clone := make(Values, len(s.values))
clone := make(metric.Values, len(s.values))
copy(clone, s.values)
return clone
}
func (s *arrayStream) expunge(t clientmodel.Timestamp) Values {
func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values {
s.Lock()
defer s.Unlock()
@ -85,7 +86,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) Values {
return expunged
}
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) Values {
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
@ -93,29 +94,29 @@ func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) Values {
l := len(s.values)
switch l {
case 0:
return Values{}
return metric.Values{}
case 1:
return Values{s.values[0]}
return metric.Values{s.values[0]}
default:
index := sort.Search(l, func(i int) bool {
return !s.values[i].Timestamp.Before(t)
})
if index == 0 {
return Values{s.values[0]}
return metric.Values{s.values[0]}
}
if index == l {
return Values{s.values[l-1]}
return metric.Values{s.values[l-1]}
}
if s.values[index].Timestamp.Equal(t) {
return Values{s.values[index]}
return metric.Values{s.values[index]}
}
return Values{s.values[index-1], s.values[index]}
return metric.Values{s.values[index-1], s.values[index]}
}
}
func (s *arrayStream) getBoundaryValues(in Interval) Values {
func (s *arrayStream) getBoundaryValues(in metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
@ -130,15 +131,15 @@ func (s *arrayStream) getBoundaryValues(in Interval) Values {
resultRange := s.values[oldest:newest]
switch len(resultRange) {
case 0:
return Values{}
return metric.Values{}
case 1:
return Values{resultRange[0]}
return metric.Values{resultRange[0]}
default:
return Values{resultRange[0], resultRange[len(resultRange)-1]}
return metric.Values{resultRange[0], resultRange[len(resultRange)-1]}
}
}
func (s *arrayStream) getRangeValues(in Interval) Values {
func (s *arrayStream) getRangeValues(in metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
@ -150,7 +151,7 @@ func (s *arrayStream) getRangeValues(in Interval) Values {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
result := make(Values, newest-oldest)
result := make(metric.Values, newest-oldest)
copy(result, s.values[oldest:newest])
return result
@ -161,13 +162,13 @@ func (s *arrayStream) size() int {
}
func (s *arrayStream) clear() {
s.values = Values{}
s.values = metric.Values{}
}
func newArrayStream(metric clientmodel.Metric) *arrayStream {
func newArrayStream(m clientmodel.Metric) *arrayStream {
return &arrayStream{
m: metric,
values: make(Values, 0, initialSeriesArenaSize),
m: m,
values: make(metric.Values, 0, initialSeriesArenaSize),
}
}
@ -176,7 +177,7 @@ type memorySeriesStorage struct {
wmCache *watermarkCache
fingerprintToSeries map[clientmodel.Fingerprint]stream
labelPairToFingerprints map[LabelPair]utility.Set
labelPairToFingerprints map[metric.LabelPair]utility.Set
labelNameToLabelValues map[clientmodel.LabelName]utility.Set
}
@ -203,8 +204,8 @@ func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error {
fingerprint := &clientmodel.Fingerprint{}
fingerprint.LoadFromMetric(sample.Metric)
series := s.getOrCreateSeries(sample.Metric, fingerprint)
series.add(Values{
SamplePair{
series.add(metric.Values{
metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
},
@ -231,15 +232,15 @@ func (s *memorySeriesStorage) CreateEmptySeries(metric clientmodel.Metric) {
s.getOrCreateSeries(m, fingerprint)
}
func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) stream {
series, ok := s.fingerprintToSeries[*fingerprint]
func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *clientmodel.Fingerprint) stream {
series, ok := s.fingerprintToSeries[*fp]
if !ok {
series = newArrayStream(metric)
s.fingerprintToSeries[*fingerprint] = series
series = newArrayStream(m)
s.fingerprintToSeries[*fp] = series
for k, v := range metric {
labelPair := LabelPair{
for k, v := range m {
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
@ -249,7 +250,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, finge
fps = utility.Set{}
s.labelPairToFingerprints[labelPair] = fps
}
fps.Add(*fingerprint)
fps.Add(*fp)
values, ok := s.labelNameToLabelValues[k]
if !ok {
@ -318,7 +319,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
}
for k, v := range series.metric() {
labelPair := LabelPair{
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
@ -335,7 +336,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
// Append raw samples, bypassing indexing. Only used to add data to views,
// which don't need to lookup by metric.
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmodel.Fingerprint, samples Values) {
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmodel.Fingerprint, samples metric.Values) {
s.Lock()
defer s.Unlock()
@ -349,15 +350,15 @@ func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmo
series.add(samples)
}
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers LabelMatchers) (clientmodel.Fingerprints, error) {
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) (clientmodel.Fingerprints, error) {
s.RLock()
defer s.RUnlock()
sets := []utility.Set{}
for _, matcher := range labelMatchers {
switch matcher.Type {
case Equal:
set, ok := s.labelPairToFingerprints[LabelPair{
case metric.Equal:
set, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
}]
@ -377,7 +378,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers Labe
}
set := utility.Set{}
for _, v := range matches {
subset, ok := s.labelPairToFingerprints[LabelPair{
subset, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: v,
}]
@ -452,7 +453,7 @@ func (s *memorySeriesStorage) HasFingerprint(f *clientmodel.Fingerprint) bool {
return has
}
func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) Values {
func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) metric.Values {
s.RLock()
defer s.RUnlock()
@ -464,7 +465,7 @@ func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) Values {
return series.clone()
}
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values {
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
@ -476,7 +477,7 @@ func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clien
return series.getValueAtTime(t)
}
func (s *memorySeriesStorage) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values {
func (s *memorySeriesStorage) GetBoundaryValues(f *clientmodel.Fingerprint, i metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
@ -488,7 +489,7 @@ func (s *memorySeriesStorage) GetBoundaryValues(f *clientmodel.Fingerprint, i In
return series.getBoundaryValues(i)
}
func (s *memorySeriesStorage) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values {
func (s *memorySeriesStorage) GetRangeValues(f *clientmodel.Fingerprint, i metric.Interval) metric.Values {
s.RLock()
defer s.RUnlock()
@ -531,7 +532,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),
labelPairToFingerprints: make(map[LabelPair]utility.Set),
labelPairToFingerprints: make(map[metric.LabelPair]utility.Set),
labelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set),
wmCache: o.WatermarkCache,
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -20,14 +20,16 @@ import (
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
func BenchmarkStreamAdd(b *testing.B) {
b.StopTimer()
s := newArrayStream(clientmodel.Metric{})
samples := make(Values, b.N)
samples := make(metric.Values, b.N)
for i := 0; i < b.N; i++ {
samples = append(samples, SamplePair{
samples = append(samples, metric.SamplePair{
Timestamp: clientmodel.TimestampFromTime(time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC)),
Value: clientmodel.SampleValue(i),
})

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -19,35 +19,19 @@ import (
"time"
clientmodel "github.com/prometheus/client_golang/model"
)
// op encapsulates a primitive query operation.
type op interface {
// Fingerprint returns the fingerprint of the metric this operation
// operates on.
Fingerprint() *clientmodel.Fingerprint
// ExtractSamples extracts samples from a stream of values and advances
// the operation time.
ExtractSamples(Values) Values
// Consumed returns whether the operator has consumed all data it needs.
Consumed() bool
// CurrentTime gets the current operation time. In a newly created op,
// this is the starting time of the operation. During ongoing execution
// of the op, the current time is advanced accordingly. Once no
// subsequent work associated with the operation remains, nil is
// returned.
CurrentTime() clientmodel.Timestamp
}
"github.com/prometheus/prometheus/storage/metric"
)
// durationOperator encapsulates a general operation that occurs over a
// duration.
type durationOperator interface {
op
metric.Op
Through() clientmodel.Timestamp
}
// ops is a heap of operations, primary sorting key is the fingerprint.
type ops []op
type ops []metric.Op
// Len implements sort.Interface and heap.Interface.
func (o ops) Len() int {
@ -75,7 +59,7 @@ func (o ops) Swap(i, j int) {
func (o *ops) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's
// length, not just its contents.
*o = append(*o, x.(op))
*o = append(*o, x.(metric.Op))
}
// Push implements heap.Interface.
@ -124,7 +108,7 @@ func (g *getValuesAtTimeOp) String() string {
return fmt.Sprintf("getValuesAtTimeOp at %s", g.current)
}
func (g *getValuesAtTimeOp) ExtractSamples(in Values) (out Values) {
func (g *getValuesAtTimeOp) ExtractSamples(in metric.Values) (out metric.Values) {
if len(in) == 0 {
return
}
@ -151,7 +135,7 @@ func (g *getValuesAlongRangeOp) Through() clientmodel.Timestamp {
return g.through
}
func (g *getValuesAlongRangeOp) ExtractSamples(in Values) (out Values) {
func (g *getValuesAlongRangeOp) ExtractSamples(in metric.Values) (out metric.Values) {
if len(in) == 0 {
return
}
@ -199,7 +183,7 @@ func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.current, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
func (g *getValuesAtIntervalOp) ExtractSamples(in metric.Values) (out metric.Values) {
if len(in) == 0 {
return
}
@ -208,7 +192,7 @@ func (g *getValuesAtIntervalOp) ExtractSamples(in Values) (out Values) {
if g.current.After(lastChunkTime) {
g.current = g.through.Add(clientmodel.MinimumTick)
return Values{in[len(in)-1]}
return metric.Values{in[len(in)-1]}
}
for len(in) > 0 {
@ -259,7 +243,7 @@ func (g *getValueRangeAtIntervalOp) advanceToNextInterval() {
g.current = g.rangeThrough.Add(-g.rangeDuration)
}
func (g *getValueRangeAtIntervalOp) ExtractSamples(in Values) (out Values) {
func (g *getValueRangeAtIntervalOp) ExtractSamples(in metric.Values) (out metric.Values) {
if len(in) == 0 {
return
}
@ -319,7 +303,7 @@ func (s getValuesAtIntervalOps) Less(i, j int) bool {
// are adjacent to it.
//
// An assumption of this is that the provided samples are already sorted!
func extractValuesAroundTime(t clientmodel.Timestamp, in Values) Values {
func extractValuesAroundTime(t clientmodel.Timestamp, in metric.Values) metric.Values {
i := sort.Search(len(in), func(i int) bool {
return !in[i].Timestamp.Before(t)
})

View file

@ -11,18 +11,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"
"time"
"github.com/prometheus/prometheus/storage/metric"
)
func TestGetValuesAtTimeOp(t *testing.T) {
var scenarios = []struct {
op getValuesAtTimeOp
in Values
out Values
in metric.Values
out metric.Values
}{
// No values.
{
@ -35,13 +37,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -53,13 +55,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(1 * time.Minute)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -71,13 +73,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -89,7 +91,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -99,7 +101,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -111,7 +113,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(1 * time.Minute)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -121,7 +123,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -133,7 +135,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(90 * time.Second)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -143,7 +145,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -159,7 +161,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -169,7 +171,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -185,7 +187,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
op: getValuesAtTimeOp{
baseOp: baseOp{current: testInstant.Add(3 * time.Minute)},
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -195,7 +197,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -221,8 +223,8 @@ func TestGetValuesAtTimeOp(t *testing.T) {
func TestGetValuesAtIntervalOp(t *testing.T) {
var scenarios = []struct {
op getValuesAtIntervalOp
in Values
out Values
in metric.Values
out metric.Values
}{
// No values.
{
@ -243,7 +245,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -253,7 +255,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -269,7 +271,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -279,7 +281,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -299,7 +301,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -313,7 +315,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -333,7 +335,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -343,7 +345,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -363,7 +365,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -381,7 +383,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -401,7 +403,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 30 * time.Second,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -411,7 +413,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -431,7 +433,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
},
interval: 3 * time.Minute,
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -449,7 +451,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -490,8 +492,8 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
func TestGetValuesAlongRangeOp(t *testing.T) {
var scenarios = []struct {
op getValuesAlongRangeOp
in Values
out Values
in metric.Values
out metric.Values
}{
// No values.
{
@ -506,7 +508,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant},
through: testInstant.Add(1 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -516,7 +518,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{},
out: metric.Values{},
},
// Operator range starts before first value, ends within available values.
{
@ -524,7 +526,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant},
through: testInstant.Add(2 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -534,7 +536,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -547,7 +549,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant.Add(1 * time.Minute)},
through: testInstant.Add(2 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -561,7 +563,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -574,7 +576,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant},
through: testInstant.Add(3 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -584,7 +586,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(1 * time.Minute),
Value: 1,
@ -601,7 +603,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(4 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -619,7 +621,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(2 * time.Minute),
Value: 1,
@ -636,7 +638,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
baseOp: baseOp{current: testInstant.Add(2 * time.Minute)},
through: testInstant.Add(3 * time.Minute),
},
in: Values{
in: metric.Values{
{
Timestamp: testInstant,
Value: 1,
@ -646,7 +648,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
Value: 1,
},
},
out: Values{},
out: metric.Values{},
},
}
for i, scenario := range scenarios {
@ -677,13 +679,13 @@ func TestGetValueRangeAtIntervalOp(t *testing.T) {
var scenarios = []struct {
op getValueRangeAtIntervalOp
in Values
out Values
in metric.Values
out metric.Values
}{
// All values before the first range.
{
op: testOp,
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
@ -693,12 +695,12 @@ func TestGetValueRangeAtIntervalOp(t *testing.T) {
Value: 2,
},
},
out: Values{},
out: metric.Values{},
},
// Values starting before first range, ending after last.
// metric.Values starting before first range, ending after last.
{
op: testOp,
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(-4 * time.Minute),
Value: 1,
@ -756,7 +758,7 @@ func TestGetValueRangeAtIntervalOp(t *testing.T) {
Value: 14,
},
},
out: Values{
out: metric.Values{
{
Timestamp: testInstant.Add(-2 * time.Minute),
Value: 3,
@ -795,20 +797,20 @@ func TestGetValueRangeAtIntervalOp(t *testing.T) {
},
},
},
// Values starting after last range.
// metric.Values starting after last range.
{
op: testOp,
in: Values{
in: metric.Values{
{
Timestamp: testInstant.Add(21 * time.Minute),
Value: 14,
},
},
out: Values{},
out: metric.Values{},
},
}
for i, scenario := range scenarios {
actual := Values{}
actual := metric.Values{}
for !scenario.op.Consumed() {
actual = append(actual, scenario.op.ExtractSamples(scenario.in)...)
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -20,6 +20,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
@ -101,8 +102,8 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
}()
var pendingMutations = 0
var pendingSamples Values
var unactedSamples Values
var pendingSamples metric.Values
var unactedSamples metric.Values
var lastTouchedTime clientmodel.Timestamp
var keyDropped bool
@ -162,7 +163,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
case len(pendingSamples) == 0 && len(unactedSamples) >= p.minimumGroupSize:
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
unactedSamples = Values{}
unactedSamples = metric.Values{}
case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize:
if !keyDropped {
@ -174,15 +175,15 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
}
pendingSamples = append(pendingSamples, unactedSamples...)
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
unactedSamples = Values{}
unactedSamples = metric.Values{}
pendingMutations++
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.minimumGroupSize:
k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey := buildSampleKey(fingerprint, pendingSamples)
newSampleKey.Dump(k)
b := pendingSamples.marshal(nil)
b := marshalValues(pendingSamples, nil)
pendingBatch.PutRaw(k, b)
pendingMutations++
@ -201,7 +202,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
} else {
pendingSamples = unactedSamples
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
unactedSamples = Values{}
unactedSamples = metric.Values{}
}
}
@ -229,11 +230,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...)
k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey := buildSampleKey(fingerprint, pendingSamples)
newSampleKey.Dump(k)
b := pendingSamples.marshal(nil)
b := marshalValues(pendingSamples, nil)
pendingBatch.PutRaw(k, b)
pendingSamples = Values{}
pendingSamples = metric.Values{}
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp
}
@ -384,7 +385,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
sampleKey.Dump(k)
pendingBatch.Drop(k)
lastCurated = sampleKey.LastTimestamp
sampleValues = Values{}
sampleValues = metric.Values{}
pendingMutations++
case sampleKey.MayContain(stopAt):
@ -396,10 +397,10 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
sampleValues = sampleValues.TruncateBefore(stopAt)
if len(sampleValues) > 0 {
k := &dto.SampleKey{}
sampleKey = sampleValues.ToSampleKey(fingerprint)
sampleKey = buildSampleKey(fingerprint, sampleValues)
sampleKey.Dump(k)
lastCurated = sampleKey.FirstTimestamp
v := sampleValues.marshal(nil)
v := marshalValues(sampleValues, nil)
pendingBatch.PutRaw(k, v)
pendingMutations++
} else {

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -22,6 +22,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
@ -42,7 +43,7 @@ type watermarkState struct {
type sampleGroup struct {
fingerprint string
values Values
values metric.Values
}
type in struct {
@ -106,12 +107,12 @@ func (s sampleGroup) Get() (key proto.Message, value interface{}) {
k := &dto.SampleKey{}
keyRaw.Dump(k)
return k, s.values.marshal(nil)
return k, marshalValues(s.values, nil)
}
type noopUpdater struct{}
func (noopUpdater) UpdateCurationState(*CurationState) {}
func (noopUpdater) UpdateCurationState(*metric.CurationState) {}
func TestCuratorCompactionProcessor(t *testing.T) {
scenarios := []struct {
@ -169,7 +170,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
@ -194,7 +195,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 0.25,
@ -219,7 +220,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
Value: 0.50,
@ -236,7 +237,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
Value: 0.75,
@ -245,7 +246,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
Value: -2,
@ -254,7 +255,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: -3,
@ -264,7 +265,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
@ -274,7 +275,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
@ -284,7 +285,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
@ -294,7 +295,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
@ -304,7 +305,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
@ -314,7 +315,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
@ -324,7 +325,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
@ -334,7 +335,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
@ -344,7 +345,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
@ -354,7 +355,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
@ -364,7 +365,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
@ -374,7 +375,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
@ -384,7 +385,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
@ -394,7 +395,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 3
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
@ -404,7 +405,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Blocks 3 and 4 and 5
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
// Block 3
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
@ -445,7 +446,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
@ -455,7 +456,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
@ -465,7 +466,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
@ -475,7 +476,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 5
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
@ -485,7 +486,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
@ -495,7 +496,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
@ -505,7 +506,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
@ -515,7 +516,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
@ -525,7 +526,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 6
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
@ -535,7 +536,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroup{
// Moved into Block 7
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
@ -577,7 +578,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleGroups: []sampleGroup{
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
@ -602,7 +603,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 0.25,
@ -627,7 +628,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
Value: 0.50,
@ -644,7 +645,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
Value: 0.75,
@ -653,7 +654,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
Value: -2,
@ -662,7 +663,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: -3,
@ -672,7 +673,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
{
// Block 1
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
@ -698,7 +699,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
{
// Block 2
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
@ -724,7 +725,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
{
// Block 3
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
@ -749,7 +750,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 15,
@ -774,7 +775,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 20,
@ -799,7 +800,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
@ -824,7 +825,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
},
{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
@ -1040,7 +1041,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 90,
@ -1053,7 +1054,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
@ -1062,7 +1063,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
@ -1071,7 +1072,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
@ -1080,7 +1081,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
@ -1089,7 +1090,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
@ -1098,7 +1099,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
@ -1107,7 +1108,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
@ -1116,7 +1117,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
@ -1125,7 +1126,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
@ -1134,7 +1135,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
@ -1143,7 +1144,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
@ -1152,7 +1153,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
@ -1161,7 +1162,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
@ -1170,7 +1171,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
@ -1179,7 +1180,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
@ -1188,7 +1189,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
Value: 14,
@ -1221,7 +1222,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
@ -1230,7 +1231,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
@ -1239,7 +1240,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
@ -1248,7 +1249,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
@ -1257,7 +1258,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
@ -1266,7 +1267,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
@ -1275,7 +1276,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
@ -1284,7 +1285,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
@ -1293,7 +1294,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
@ -1302,7 +1303,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
@ -1333,7 +1334,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleGroups: []sampleGroup{
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 30,
@ -1342,7 +1343,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
{
fingerprint: "0001-A-1-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
@ -1351,7 +1352,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
},
{
fingerprint: "0002-A-2-Z",
values: Values{
values: metric.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,

View file

@ -11,17 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
func GetFingerprintsForLabelSetUsesAndForLabelMatchingTests(p MetricPersistence, t test.Tester) {
func GetFingerprintsForLabelSetUsesAndForLabelMatchingTests(p metric.Persistence, t test.Tester) {
metrics := []clientmodel.LabelSet{
{clientmodel.MetricNameLabel: "request_metrics_latency_equal_tallying_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},
{clientmodel.MetricNameLabel: "requests_metrics_latency_equal_accumulating_microseconds", "instance": "http://localhost:9090/metrics.json", "percentile": "0.010000"},

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"
@ -19,10 +19,11 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
func GetValueAtTimeTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), t test.Tester) {
func GetValueAtTimeTests(persistenceMaker func() (metric.ViewablePersistence, test.Closer), t test.Tester) {
type value struct {
year int
month time.Month
@ -355,7 +356,7 @@ func GetValueAtTimeTests(persistenceMaker func() (ViewableMetricPersistence, tes
}
}
func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) {
func GetRangeValuesTests(persistenceMaker func() (metric.ViewablePersistence, test.Closer), onlyBoundaries bool, t test.Tester) {
type value struct {
year int
month time.Month
@ -830,12 +831,12 @@ func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, tes
input := behavior.input
open := clientmodel.TimestampFromTime(time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC))
end := clientmodel.TimestampFromTime(time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC))
in := Interval{
in := metric.Interval{
OldestInclusive: open,
NewestInclusive: end,
}
actualValues := Values{}
actualValues := metric.Values{}
expectedValues := []output{}
fp := &clientmodel.Fingerprint{}
fp.LoadFromMetric(m)
@ -899,7 +900,7 @@ func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, tes
// Test Definitions Follow
func testMemoryGetValueAtTime(t test.Tester) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
persistenceMaker := func() (metric.ViewablePersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
@ -927,7 +928,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) {
}
func testMemoryGetRangeValues(t test.Tester) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
persistenceMaker := func() (metric.ViewablePersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
@ -935,7 +936,7 @@ func testMemoryGetRangeValues(t test.Tester) {
}
func testMemoryGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
persistenceMaker := func() (metric.ViewablePersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}

View file

@ -1,10 +1,12 @@
package metric
package tiered
import (
"math/rand"
"testing"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
const numTestValues = 5000
@ -12,7 +14,7 @@ const numTestValues = 5000
func TestValuesMarshalAndUnmarshal(t *testing.T) {
values := randomValues(numTestValues)
marshalled := values.marshal(nil)
marshalled := marshalValues(values, nil)
unmarshalled := unmarshalValues(marshalled, nil)
for i, expected := range values {
@ -23,10 +25,10 @@ func TestValuesMarshalAndUnmarshal(t *testing.T) {
}
}
func randomValues(numSamples int) Values {
v := make(Values, 0, numSamples)
func randomValues(numSamples int) metric.Values {
v := make(metric.Values, 0, numSamples)
for i := 0; i < numSamples; i++ {
v = append(v, SamplePair{
v = append(v, metric.SamplePair{
Timestamp: clientmodel.Timestamp(rand.Int63()),
Value: clientmodel.SampleValue(rand.NormFloat64()),
})
@ -42,7 +44,7 @@ func benchmarkMarshal(b *testing.B, n int) {
// TODO: Reuse buffer to compare performance.
// - Delta is -30 percent time overhead.
for i := 0; i < b.N; i++ {
v.marshal(nil)
marshalValues(v, nil)
}
}
@ -68,7 +70,7 @@ func BenchmarkMarshal10000(b *testing.B) {
func benchmarkUnmarshal(b *testing.B, n int) {
v := randomValues(numTestValues)
marshalled := v.marshal(nil)
marshalled := marshalValues(v, nil)
b.ResetTimer()
// TODO: Reuse buffer to compare performance.

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -21,6 +21,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/storage/metric"
dto "github.com/prometheus/prometheus/model/generated"
)
@ -128,3 +129,13 @@ func (s *SampleKey) Load(d *dto.SampleKey) {
s.LastTimestamp = clientmodel.TimestampFromUnix(d.GetLastTimestamp())
s.SampleCount = d.GetSampleCount()
}
// buildSampleKey returns the SampleKey for the given Fingerprint and Values.
func buildSampleKey(f *clientmodel.Fingerprint, v metric.Values) *SampleKey {
return &SampleKey{
Fingerprint: f,
FirstTimestamp: v[0].Timestamp,
LastTimestamp: v[len(v)-1].Timestamp,
SampleCount: uint32(len(v)),
}
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -24,6 +24,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
dto "github.com/prometheus/prometheus/model/generated"
@ -31,17 +32,17 @@ import (
const stochasticMaximumVariance = 8
func BasicLifecycleTests(p MetricPersistence, t test.Tester) {
func BasicLifecycleTests(p metric.Persistence, t test.Tester) {
if p == nil {
t.Errorf("Received nil Metric Persistence.\n")
return
}
}
func ReadEmptyTests(p MetricPersistence, t test.Tester) {
func ReadEmptyTests(p metric.Persistence, t test.Tester) {
hasLabelPair := func(x int) (success bool) {
fingerprints, err := p.GetFingerprintsForLabelMatchers(LabelMatchers{{
Type: Equal,
fingerprints, err := p.GetFingerprintsForLabelMatchers(metric.LabelMatchers{{
Type: metric.Equal,
Name: clientmodel.LabelName(string(x)),
Value: clientmodel.LabelValue(string(x)),
}})
@ -88,7 +89,7 @@ func ReadEmptyTests(p MetricPersistence, t test.Tester) {
}
}
func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
func AppendSampleAsPureSparseAppendTests(p metric.Persistence, t test.Tester) {
appendSample := func(x int) (success bool) {
v := clientmodel.SampleValue(x)
ts := clientmodel.TimestampFromUnix(int64(x))
@ -117,7 +118,7 @@ func AppendSampleAsPureSparseAppendTests(p MetricPersistence, t test.Tester) {
}
}
func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester) {
func AppendSampleAsSparseAppendWithReadsTests(p metric.Persistence, t test.Tester) {
appendSample := func(x int) (success bool) {
v := clientmodel.SampleValue(x)
ts := clientmodel.TimestampFromUnix(int64(x))
@ -147,8 +148,8 @@ func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester
return
}
fingerprints, err := p.GetFingerprintsForLabelMatchers(LabelMatchers{{
Type: Equal,
fingerprints, err := p.GetFingerprintsForLabelMatchers(metric.LabelMatchers{{
Type: metric.Equal,
Name: labelName,
Value: labelValue,
}})
@ -169,7 +170,7 @@ func AppendSampleAsSparseAppendWithReadsTests(p MetricPersistence, t test.Tester
}
}
func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Tester) {
func AppendSampleAsPureSingleEntityAppendTests(p metric.Persistence, t test.Tester) {
appendSample := func(x int) bool {
sample := &clientmodel.Sample{
Value: clientmodel.SampleValue(x),
@ -187,7 +188,7 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
}
}
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerprint, i Interval) (samples Values, err error) {
func levelDBGetRangeValues(l *LevelDBPersistence, fp *clientmodel.Fingerprint, i metric.Interval) (samples metric.Values, err error) {
fpDto := &dto.Fingerprint{}
dumpFingerprint(fpDto, fp)
k := &dto.SampleKey{
@ -236,7 +237,7 @@ func (t timeslice) Less(i, j int) bool {
return t[i].Before(t[j])
}
func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) {
func StochasticTests(persistenceMaker func() (metric.Persistence, test.Closer), t test.Tester) {
stochastic := func(x int) (success bool) {
p, closer := persistenceMaker()
defer closer.Close()
@ -328,8 +329,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
metricNewestSample[metricIndex] = newestSample
for sharedLabelIndex := 0; sharedLabelIndex < numberOfSharedLabels; sharedLabelIndex++ {
matchers := LabelMatchers{{
Type: Equal,
matchers := metric.LabelMatchers{{
Type: metric.Equal,
Name: clientmodel.LabelName(fmt.Sprintf("shared_label_%d", sharedLabelIndex)),
Value: clientmodel.LabelValue(fmt.Sprintf("label_%d", sharedLabelIndex)),
}}
@ -350,8 +351,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
for unsharedLabelIndex := 0; unsharedLabelIndex < numberOfUnsharedLabels; unsharedLabelIndex++ {
labelName := clientmodel.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, unsharedLabelIndex))
labelValue := clientmodel.LabelValue(fmt.Sprintf("private_label_%d", unsharedLabelIndex))
matchers := LabelMatchers{{
Type: Equal,
matchers := metric.LabelMatchers{{
Type: metric.Equal,
Name: labelName,
Value: labelValue,
}}
@ -367,21 +368,21 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
}
}
metric := clientmodel.Metric{}
metric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
m := clientmodel.Metric{}
m[clientmodel.MetricNameLabel] = clientmodel.LabelValue(fmt.Sprintf("metric_index_%d", metricIndex))
for i := 0; i < numberOfSharedLabels; i++ {
l := clientmodel.LabelName(fmt.Sprintf("shared_label_%d", i))
v := clientmodel.LabelValue(fmt.Sprintf("label_%d", i))
metric[l] = v
m[l] = v
}
for i := 0; i < numberOfUnsharedLabels; i++ {
l := clientmodel.LabelName(fmt.Sprintf("metric_index_%d_private_label_%d", metricIndex, i))
v := clientmodel.LabelValue(fmt.Sprintf("private_label_%d", i))
metric[l] = v
m[l] = v
}
for i := 0; i < numberOfRangeScans; i++ {
@ -428,21 +429,21 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
begin, end = second, first
}
interval := Interval{
interval := metric.Interval{
OldestInclusive: clientmodel.TimestampFromUnix(begin),
NewestInclusive: clientmodel.TimestampFromUnix(end),
}
samples := Values{}
samples := metric.Values{}
fp := &clientmodel.Fingerprint{}
fp.LoadFromMetric(metric)
fp.LoadFromMetric(m)
switch persistence := p.(type) {
case View:
case metric.View:
samples = persistence.GetRangeValues(fp, interval)
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
case *LevelDBMetricPersistence:
case *LevelDBPersistence:
var err error
samples, err = levelDBGetRangeValues(persistence, fp, interval)
if err != nil {
@ -452,7 +453,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
default:
t.Error("Unexpected type of MetricPersistence.")
t.Error("Unexpected type of metric.Persistence.")
}
}
}
@ -528,10 +529,10 @@ func BenchmarkLevelDBAppendSampleAsPureSingleEntityAppend(b *testing.B) {
}
func testLevelDBStochastic(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
persistenceMaker := func() (metric.Persistence, test.Closer) {
temporaryDirectory := test.NewTemporaryDirectory("test_leveldb_stochastic", t)
p, err := NewLevelDBMetricPersistence(temporaryDirectory.Path())
p, err := NewLevelDBPersistence(temporaryDirectory.Path())
if err != nil {
t.Errorf("Could not start up LevelDB: %q\n", err)
}
@ -613,7 +614,7 @@ func BenchmarkMemoryAppendSampleAsPureSingleEntityAppend(b *testing.B) {
}
func testMemoryStochastic(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
persistenceMaker := func() (metric.Persistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"fmt"
@ -26,11 +26,12 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
)
type chunk Values
type chunk metric.Values
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
@ -73,7 +74,7 @@ type TieredStorage struct {
mu sync.RWMutex
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage *LevelDBMetricPersistence
DiskStorage *LevelDBPersistence
appendToDiskQueue chan clientmodel.Samples
@ -101,8 +102,8 @@ type TieredStorage struct {
// viewJob encapsulates a request to extract sample values from the datastore.
type viewJob struct {
builder ViewRequestBuilder
output chan View
builder metric.ViewRequestBuilder
output chan metric.View
abort chan bool
err chan error
stats *stats.TimerGroup
@ -128,7 +129,7 @@ func NewTieredStorage(
}
}
diskStorage, err := NewLevelDBMetricPersistence(rootDirectory)
diskStorage, err := NewLevelDBPersistence(rootDirectory)
if err != nil {
return nil, err
}
@ -200,9 +201,15 @@ func (t *TieredStorage) drain(drained chan<- bool) {
t.draining <- (drained)
}
// MakeView materializes a View according to a ViewRequestBuilder, subject to a
// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types
// of queries to perform.
func (t *TieredStorage) NewViewRequestBuilder() metric.ViewRequestBuilder {
return &viewRequestBuilder{storage: t}
}
// makeView materializes a View according to a ViewRequestBuilder, subject to a
// timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
func (t *TieredStorage) makeView(builder metric.ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (metric.View, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
@ -210,9 +217,9 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
}
// The result channel needs a one-element buffer in case we have timed
// out in MakeView, but the view rendering still completes afterwards
// out in makeView, but the view rendering still completes afterwards
// and writes to the channel.
result := make(chan View, 1)
result := make(chan metric.View, 1)
// The abort channel needs a one-element buffer in case the view
// rendering has already exited and doesn't consume from the channel
// anymore.
@ -442,7 +449,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
memValues := t.memoryArena.CloneSamples(fp)
for !op.Consumed() {
// Abort the view rendering if the caller (MakeView) has timed out.
// Abort the view rendering if the caller (makeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
@ -529,7 +536,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Extract all needed data from the current chunk and append the
// extracted samples to the materialized view.
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
view.appendSamples(fp, op.ExtractSamples(Values(currentChunk)))
view.appendSamples(fp, op.ExtractSamples(metric.Values(currentChunk)))
}
}
}
@ -545,7 +552,7 @@ func (t *TieredStorage) loadChunkAroundTime(
ts clientmodel.Timestamp,
firstBlock,
lastBlock *SampleKey,
) (chunk Values, expired bool) {
) (chunk metric.Values, expired bool) {
if fingerprint.Less(firstBlock.Fingerprint) {
return nil, false
}
@ -574,7 +581,7 @@ func (t *TieredStorage) loadChunkAroundTime(
return chunk, true
}
var foundValues Values
var foundValues metric.Values
if err := iterator.Key(dto); err != nil {
panic(err)
@ -675,7 +682,7 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (c
// GetFingerprintsForLabelMatchers gets all of the metric fingerprints that are
// associated with the provided label matchers.
func (t *TieredStorage) GetFingerprintsForLabelMatchers(matchers LabelMatchers) (clientmodel.Fingerprints, error) {
func (t *TieredStorage) GetFingerprintsForLabelMatchers(matchers metric.LabelMatchers) (clientmodel.Fingerprints, error) {
t.mu.RLock()
defer t.mu.RUnlock()
@ -704,6 +711,15 @@ func (t *TieredStorage) GetFingerprintsForLabelMatchers(matchers LabelMatchers)
return fingerprints, nil
}
// Get all of the label values that are associated with a given label name.
func (t *TieredStorage) GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error) {
// TODO(julius): Implement this or decide what to do with this
// Persistence interface method. It's currently unused on the
// TieredStorage, but used on the LevelDBPersistence and the
// memorySeriesStorage.
panic("not implemented")
}
// GetMetricForFingerprint gets the metric associated with the provided
// fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"math"
@ -22,6 +22,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
@ -42,9 +43,9 @@ func buildSamples(from, to clientmodel.Timestamp, interval time.Duration, m clie
return
}
func buildValues(firstValue clientmodel.SampleValue, from, to clientmodel.Timestamp, interval time.Duration) (v Values) {
func buildValues(firstValue clientmodel.SampleValue, from, to clientmodel.Timestamp, interval time.Duration) (v metric.Values) {
for from.Before(to) {
v = append(v, SamplePair{
v = append(v, metric.SamplePair{
Value: firstValue,
Timestamp: from,
})
@ -64,13 +65,13 @@ func testMakeView(t test.Tester, flushToDisk bool) {
}
type out struct {
atTime []Values
atInterval []Values
alongRange []Values
atTime []metric.Values
atInterval []metric.Values
alongRange []metric.Values
}
metric := clientmodel.Metric{clientmodel.MetricNameLabel: "request_count"}
m := clientmodel.Metric{clientmodel.MetricNameLabel: "request_count"}
fingerprint := &clientmodel.Fingerprint{}
fingerprint.LoadFromMetric(metric)
fingerprint.LoadFromMetric(m)
var (
instant = clientmodel.TimestampFromTime(time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local))
scenarios = []struct {
@ -89,14 +90,14 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{{}},
atTime: []metric.Values{{}},
},
},
// Single sample, query asks for exact sample time.
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
@ -109,7 +110,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant,
@ -123,12 +124,12 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant.Add(time.Second),
},
{
Metric: metric,
Metric: m,
Value: 1,
Timestamp: instant.Add(time.Second * 2),
},
@ -141,7 +142,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant.Add(time.Second),
@ -155,7 +156,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
@ -168,7 +169,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant,
@ -182,12 +183,12 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Metric: m,
Value: 1,
Timestamp: instant.Add(time.Second),
},
@ -200,7 +201,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant,
@ -214,17 +215,17 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Metric: m,
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Metric: metric,
Metric: m,
Value: 2,
Timestamp: instant.Add(time.Second * 2),
},
@ -237,7 +238,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant.Add(time.Second),
@ -251,17 +252,17 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Metric: m,
Value: 1,
Timestamp: instant.Add(time.Second * 2),
},
{
Metric: metric,
Metric: m,
Value: 2,
Timestamp: instant.Add(time.Second * 4),
},
@ -274,7 +275,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant,
@ -292,17 +293,17 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Metric: m,
Value: 1,
Timestamp: instant.Add(time.Second * 2),
},
{
Metric: metric,
Metric: m,
Value: 2,
Timestamp: instant.Add(time.Second * 4),
},
@ -315,7 +316,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant.Add(time.Second * 2),
@ -335,7 +336,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
instant,
instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second),
2*time.Second,
metric,
m,
),
in: in{
atTime: []getValuesAtTimeOp{
@ -345,7 +346,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2)),
@ -365,7 +366,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
instant,
instant.Add(time.Duration(*leveldbChunkSize*4)*time.Second),
2*time.Second,
metric,
m,
),
in: in{
atTime: []getValuesAtTimeOp{
@ -375,7 +376,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atTime: []Values{
atTime: []metric.Values{
{
{
Timestamp: instant.Add(time.Second * (time.Duration(*leveldbChunkSize*2) - 2)),
@ -395,7 +396,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
instant,
instant.Add(time.Duration(*leveldbChunkSize*6)*time.Second),
2*time.Second,
metric,
m,
),
in: in{
atInterval: []getValuesAtIntervalOp{
@ -409,7 +410,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atInterval: []Values{
atInterval: []metric.Values{
{
{
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize*2-6)),
@ -437,7 +438,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
instant,
instant.Add(time.Duration(*leveldbChunkSize*6)*time.Second),
2*time.Second,
metric,
m,
),
in: in{
alongRange: []getValuesAlongRangeOp{
@ -448,7 +449,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
alongRange: []Values{buildValues(
alongRange: []metric.Values{buildValues(
clientmodel.SampleValue(198),
instant.Add(time.Second*time.Duration(*leveldbChunkSize*2-4)),
instant.Add(time.Second*time.Duration(*leveldbChunkSize*4+2)+clientmodel.MinimumTick),
@ -482,7 +483,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
instant,
instant.Add(time.Duration(*leveldbChunkSize*6)*time.Second),
2*time.Second,
metric,
m,
),
in: in{
atInterval: []getValuesAtIntervalOp{
@ -496,7 +497,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atInterval: []Values{
atInterval: []metric.Values{
// We need two overlapping buildValues() calls here since the last
// value of the second chunk is extracted twice (value 399, time
// offset 798s).
@ -528,7 +529,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant,
},
@ -545,7 +546,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atInterval: []Values{
atInterval: []metric.Values{
{
{
Timestamp: instant,
@ -559,7 +560,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
{
data: clientmodel.Samples{
{
Metric: metric,
Metric: m,
Value: 0,
Timestamp: instant.Add(time.Second),
},
@ -576,7 +577,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
},
},
out: out{
atInterval: []Values{
atInterval: []metric.Values{
{
{
Timestamp: instant.Add(time.Second),
@ -609,7 +610,7 @@ func testMakeView(t test.Tester, flushToDisk bool) {
tiered.Flush()
}
requestBuilder := NewViewRequestBuilder()
requestBuilder := tiered.NewViewRequestBuilder()
for _, atTime := range scenario.in.atTime {
requestBuilder.GetMetricAtTime(fingerprint, atTime.current)
@ -623,14 +624,14 @@ func testMakeView(t test.Tester, flushToDisk bool) {
requestBuilder.GetMetricRange(fingerprint, alongRange.current, alongRange.through)
}
v, err := tiered.MakeView(requestBuilder, time.Second*5, stats.NewTimerGroup())
v, err := requestBuilder.Execute(time.Second*5, stats.NewTimerGroup())
if err != nil {
t.Fatalf("%d. failed due to %s", i, err)
}
// To get all values in the View, ask for the 'forever' interval.
interval := Interval{OldestInclusive: math.MinInt64, NewestInclusive: math.MaxInt64}
interval := metric.Interval{OldestInclusive: math.MinInt64, NewestInclusive: math.MaxInt64}
for j, atTime := range scenario.out.atTime {
actual := v.GetRangeValues(fingerprint, interval)
@ -821,55 +822,55 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
tiered.Flush()
scenarios := []struct {
matchers LabelMatchers
matchers metric.LabelMatchers
fpCount int
}{
{
matchers: LabelMatchers{},
matchers: metric.LabelMatchers{},
fpCount: 0,
}, {
matchers: LabelMatchers{
matchers: metric.LabelMatchers{
{
Type: Equal,
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "http_requests",
},
},
fpCount: 2,
}, {
matchers: LabelMatchers{
matchers: metric.LabelMatchers{
{
Type: Equal,
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "http_requests",
}, {
Type: Equal,
Type: metric.Equal,
Name: "method",
Value: "/foo",
},
},
fpCount: 1,
}, {
matchers: LabelMatchers{
matchers: metric.LabelMatchers{
{
Type: Equal,
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "http_requests",
}, {
Type: Equal,
Type: metric.Equal,
Name: "method",
Value: "/bar",
},
},
fpCount: 1,
}, {
matchers: LabelMatchers{
matchers: metric.LabelMatchers{
{
Type: Equal,
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "http_requests",
}, {
Type: Equal,
Type: metric.Equal,
Name: "method",
Value: "/baz",
},
@ -891,18 +892,18 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
func TestTruncateBefore(t *testing.T) {
type in struct {
values Values
values metric.Values
time clientmodel.Timestamp
}
instant := clientmodel.Now()
var scenarios = []struct {
in in
out Values
out metric.Values
}{
{
in: in{
time: instant,
values: Values{
values: metric.Values{
{
Value: 0,
Timestamp: instant,
@ -925,7 +926,7 @@ func TestTruncateBefore(t *testing.T) {
},
},
},
out: Values{
out: metric.Values{
{
Value: 0,
Timestamp: instant,
@ -951,7 +952,7 @@ func TestTruncateBefore(t *testing.T) {
{
in: in{
time: instant.Add(2 * time.Second),
values: Values{
values: metric.Values{
{
Value: 0,
Timestamp: instant,
@ -974,7 +975,7 @@ func TestTruncateBefore(t *testing.T) {
},
},
},
out: Values{
out: metric.Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
@ -996,7 +997,7 @@ func TestTruncateBefore(t *testing.T) {
{
in: in{
time: instant.Add(5 * time.Second),
values: Values{
values: metric.Values{
{
Value: 0,
Timestamp: instant,
@ -1019,7 +1020,7 @@ func TestTruncateBefore(t *testing.T) {
},
},
},
out: Values{
out: metric.Values{
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,

View file

@ -11,68 +11,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"container/heap"
"time"
clientmodel "github.com/prometheus/client_golang/model"
)
var (
// firstSupertime is the smallest valid supertime that may be seeked to.
firstSupertime = []byte{0, 0, 0, 0, 0, 0, 0, 0}
// lastSupertime is the largest valid supertime that may be seeked to.
lastSupertime = []byte{127, 255, 255, 255, 255, 255, 255, 255}
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
)
// ViewRequestBuilder represents the summation of all datastore queries that
// shall be performed to extract values. Call the Get... methods to record the
// queries. Once done, use HasOp and PopOp to retrieve the resulting
// operations. The operations are sorted by their fingerprint (and, for equal
// fingerprints, by the StartsAt timestamp of their operation).
type ViewRequestBuilder interface {
// GetMetricAtTime records a query to get, for the given Fingerprint,
// either the value at that time if there is a match or the one or two
// values adjacent thereto.
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
// GetMetricAtInterval records a query to get, for the given
// Fingerprint, either the value at that interval from From through
// Through if there is a match or the one or two values adjacent for
// each point.
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
// GetMetricRange records a query to get, for the given Fingerprint, the
// values that occur inclusively from From through Through.
GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp)
// GetMetricRangeAtInterval records a query to get value ranges at
// intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
// | \------------/ \----/ |
// from interval rangeDuration through
GetMetricRangeAtInterval(fp *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration)
// PopOp emits the next operation in the queue (sorted by
// fingerprint). If called while HasOps returns false, the
// behavior is undefined.
PopOp() op
// HasOp returns true if there is at least one more operation in the
// queue.
HasOp() bool
}
// viewRequestBuilder contains the various requests for data.
type viewRequestBuilder struct {
storage *TieredStorage
operations ops
}
// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types
// of queries to perform.
func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{}
}
var getValuesAtTimes = newValueAtTimeList(10 * 1024)
// GetMetricAtTime implements ViewRequestBuilder.
@ -101,9 +57,14 @@ func (v *viewRequestBuilder) GetMetricRangeAtInterval(fp *clientmodel.Fingerprin
heap.Push(&v.operations, getValuesAtIntervalAlongRanges.Get(fp, from, through, interval, rangeDuration))
}
// Execute implements ViewRequestBuilder.
func (v *viewRequestBuilder) Execute(deadline time.Duration, queryStats *stats.TimerGroup) (metric.View, error) {
return v.storage.makeView(v, deadline, queryStats)
}
// PopOp implements ViewRequestBuilder.
func (v *viewRequestBuilder) PopOp() op {
return heap.Pop(&v.operations).(op)
func (v *viewRequestBuilder) PopOp() metric.Op {
return heap.Pop(&v.operations).(metric.Op)
}
// HasOp implements ViewRequestBuilder.
@ -115,7 +76,7 @@ type view struct {
*memorySeriesStorage
}
func (v view) appendSamples(fingerprint *clientmodel.Fingerprint, samples Values) {
func (v view) appendSamples(fingerprint *clientmodel.Fingerprint, samples metric.Values) {
v.memorySeriesStorage.appendSamplesWithoutIndexing(fingerprint, samples)
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"testing"
@ -153,7 +153,7 @@ func testBuilder(t test.Tester) {
}
for i, scenario := range scenarios {
builder := NewViewRequestBuilder()
builder := &viewRequestBuilder{}
for _, atTime := range scenario.in.atTimes {
fingerprint := &clientmodel.Fingerprint{}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
package tiered
import (
"code.google.com/p/goprotobuf/proto"

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
)
var (
@ -41,7 +42,7 @@ type SamplesDumper struct {
}
func (d *SamplesDumper) Operate(key, value interface{}) *storage.OperatorError {
sampleKey := key.(*metric.SampleKey)
sampleKey := key.(*tiered.SampleKey)
if *dieOnBadChunk && sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) {
glog.Fatalf("Chunk: First time (%v) after last time (%v): %v\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey)
}
@ -75,7 +76,7 @@ func main() {
glog.Fatal("Must provide a path...")
}
persistence, err := metric.NewLevelDBMetricPersistence(*storageRoot)
persistence, err := tiered.NewLevelDBMetricPersistence(*storageRoot)
if err != nil {
glog.Fatal(err)
}
@ -85,7 +86,7 @@ func main() {
csv.NewWriter(os.Stdout),
}
entire, err := persistence.MetricSamples.ForEach(&metric.MetricSamplesDecoder{}, &metric.AcceptAllFilter{}, dumper)
entire, err := persistence.MetricSamples.ForEach(&tiered.MetricSamplesDecoder{}, &tiered.AcceptAllFilter{}, dumper)
if err != nil {
glog.Fatal("Error dumping samples: ", err)
}

View file

@ -18,7 +18,7 @@ package main
import (
"flag"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
"time"
"github.com/golang/glog"
@ -35,7 +35,7 @@ func main() {
glog.Fatal("Must provide a path...")
}
persistences, err := metric.NewLevelDBMetricPersistence(*storageRoot)
persistences, err := tiered.NewLevelDBMetricPersistence(*storageRoot)
if err != nil {
glog.Fatal(err)
}

View file

@ -29,7 +29,7 @@ type MetricsService struct {
time utility.Time
Config *config.Config
TargetManager retrieval.TargetManager
Storage *metric.TieredStorage
Storage metric.PreloadingPersistence
}
func (msrv *MetricsService) RegisterHandler() {