mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Merge pull request #98 from prometheus/julius-integrate-tiered-layers
Implement tiered data integration for simple (non-value) Get* methods
This commit is contained in:
commit
567a998e63
|
@ -44,6 +44,7 @@ type MetricPersistence interface {
|
||||||
// name.
|
// name.
|
||||||
GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error)
|
GetFingerprintsForLabelName(model.LabelName) (model.Fingerprints, error)
|
||||||
|
|
||||||
|
// Get the metric associated with the provided fingerprint.
|
||||||
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
|
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
|
||||||
|
|
||||||
GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error)
|
GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error)
|
||||||
|
@ -52,7 +53,8 @@ type MetricPersistence interface {
|
||||||
|
|
||||||
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
||||||
|
|
||||||
GetAllMetricNames() ([]string, error)
|
// Get all label values that are associated with a given label name.
|
||||||
|
GetAllValuesForLabel(model.LabelName) (model.LabelValues, error)
|
||||||
|
|
||||||
// Requests the storage stack to build a materialized View of the values
|
// Requests the storage stack to build a materialized View of the values
|
||||||
// contained therein.
|
// contained therein.
|
||||||
|
|
|
@ -1221,13 +1221,16 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.
|
||||||
type MetricKeyDecoder struct{}
|
type MetricKeyDecoder struct{}
|
||||||
|
|
||||||
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
||||||
unmarshaled := &dto.LabelPair{}
|
unmarshaled := dto.LabelPair{}
|
||||||
err = proto.Unmarshal(in.([]byte), unmarshaled)
|
err = proto.Unmarshal(in.([]byte), &unmarshaled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
out = unmarshaled
|
out = model.LabelPair{
|
||||||
|
Name: model.LabelName(*unmarshaled.Name),
|
||||||
|
Value: model.LabelValue(*unmarshaled.Value),
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1236,35 +1239,40 @@ func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricNamesFilter struct{}
|
type LabelNameFilter struct {
|
||||||
|
labelName model.LabelName
|
||||||
|
}
|
||||||
|
|
||||||
func (f *MetricNamesFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
|
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
|
||||||
unmarshaled, ok := key.(*dto.LabelPair)
|
labelPair, ok := key.(model.LabelPair)
|
||||||
if ok && *unmarshaled.Name == "name" {
|
if ok && labelPair.Name == f.labelName {
|
||||||
return storage.ACCEPT
|
return storage.ACCEPT
|
||||||
}
|
}
|
||||||
return storage.SKIP
|
return storage.SKIP
|
||||||
}
|
}
|
||||||
|
|
||||||
type CollectMetricNamesOp struct {
|
type CollectLabelValuesOp struct {
|
||||||
metricNames []string
|
labelValues []model.LabelValue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (op *CollectMetricNamesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
|
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
|
||||||
unmarshaled := key.(*dto.LabelPair)
|
labelPair := key.(model.LabelPair)
|
||||||
op.metricNames = append(op.metricNames, *unmarshaled.Value)
|
op.labelValues = append(op.labelValues, model.LabelValue(labelPair.Value))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LevelDBMetricPersistence) GetAllMetricNames() (metricNames []string, err error) {
|
func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||||
metricNamesOp := &CollectMetricNamesOp{}
|
filter := &LabelNameFilter{
|
||||||
|
labelName: labelName,
|
||||||
|
}
|
||||||
|
labelValuesOp := &CollectLabelValuesOp{}
|
||||||
|
|
||||||
_, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, &MetricNamesFilter{}, metricNamesOp)
|
_, err = l.labelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
metricNames = metricNamesOp.metricNames
|
values = labelValuesOp.labelValues
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -348,8 +348,17 @@ func (s memorySeriesStorage) Close() (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s memorySeriesStorage) GetAllMetricNames() ([]string, error) {
|
func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||||
panic("not implemented")
|
valueSet := map[model.LabelValue]bool{}
|
||||||
|
for _, series := range s.fingerprintToSeries {
|
||||||
|
if value, ok := series.metric[labelName]; ok {
|
||||||
|
if !valueSet[value] {
|
||||||
|
values = append(values, value)
|
||||||
|
valueSet[value] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) {
|
func (s memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) {
|
||||||
|
|
|
@ -63,9 +63,12 @@ type Storage interface {
|
||||||
Flush()
|
Flush()
|
||||||
Close()
|
Close()
|
||||||
|
|
||||||
// MetricPersistence proxy methods.
|
// Get all label values that are associated with the provided label name.
|
||||||
GetAllMetricNames() ([]string, error)
|
GetAllValuesForLabel(model.LabelName) (model.LabelValues, error)
|
||||||
|
// Get all of the metric fingerprints that are associated with the provided
|
||||||
|
// label set.
|
||||||
GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error)
|
GetFingerprintsForLabelSet(model.LabelSet) (model.Fingerprints, error)
|
||||||
|
// Get the metric associated with the provided fingerprint.
|
||||||
GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error)
|
GetMetricForFingerprint(model.Fingerprint) (m *model.Metric, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -518,17 +521,54 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tieredStorage) GetAllMetricNames() ([]string, error) {
|
func (t *tieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||||
// TODO: handle memory persistence as well.
|
diskValues, err := t.diskStorage.GetAllValuesForLabel(labelName)
|
||||||
return t.diskStorage.GetAllMetricNames()
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
valueSet := map[model.LabelValue]bool{}
|
||||||
|
for _, value := range append(diskValues, memoryValues...) {
|
||||||
|
if !valueSet[value] {
|
||||||
|
values = append(values, value)
|
||||||
|
valueSet[value] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) {
|
func (t *tieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) {
|
||||||
// TODO: handle memory persistence as well.
|
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
|
||||||
return t.diskStorage.GetFingerprintsForLabelSet(labelSet)
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
diskFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fingerprintSet := map[model.Fingerprint]bool{}
|
||||||
|
for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
|
||||||
|
fingerprintSet[fingerprint] = true
|
||||||
|
}
|
||||||
|
for fingerprint := range fingerprintSet {
|
||||||
|
fingerprints = append(fingerprints, fingerprint)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
|
func (t *tieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
|
||||||
// TODO: handle memory persistence as well.
|
m, err = t.memoryArena.GetMetricForFingerprint(f)
|
||||||
return t.diskStorage.GetMetricForFingerprint(f)
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m == nil {
|
||||||
|
m, err = t.diskStorage.GetMetricForFingerprint(f)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,37 @@ import (
|
||||||
"github.com/prometheus/prometheus/utility/test"
|
"github.com/prometheus/prometheus/utility/test"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type testTieredStorageCloser struct {
|
||||||
|
storage Storage
|
||||||
|
dirName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testTieredStorageCloser) Close() {
|
||||||
|
t.storage.Close()
|
||||||
|
os.RemoveAll(t.dirName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestTieredStorage(t test.Tester) (storage Storage, closer *testTieredStorageCloser) {
|
||||||
|
tempDir, _ := ioutil.TempDir("", "test_tiered_storage")
|
||||||
|
storage = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, tempDir)
|
||||||
|
|
||||||
|
if storage == nil {
|
||||||
|
t.Fatalf("%d. storage == nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
go storage.Serve()
|
||||||
|
closer = &testTieredStorageCloser{
|
||||||
|
storage: storage,
|
||||||
|
dirName: tempDir,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) {
|
func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) {
|
||||||
i := model.SampleValue(0)
|
i := model.SampleValue(0)
|
||||||
|
|
||||||
|
@ -340,21 +367,8 @@ func testMakeView(t test.Tester) {
|
||||||
)
|
)
|
||||||
|
|
||||||
for i, scenario := range scenarios {
|
for i, scenario := range scenarios {
|
||||||
var (
|
tiered, closer := newTestTieredStorage(t)
|
||||||
temporary, _ = ioutil.TempDir("", "test_make_view")
|
defer closer.Close()
|
||||||
tiered = NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, temporary)
|
|
||||||
)
|
|
||||||
|
|
||||||
if tiered == nil {
|
|
||||||
t.Fatalf("%d. tiered == nil", i)
|
|
||||||
}
|
|
||||||
|
|
||||||
go tiered.Serve()
|
|
||||||
defer tiered.Drain()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
os.RemoveAll(temporary)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for j, datum := range scenario.data {
|
for j, datum := range scenario.data {
|
||||||
err := tiered.AppendSample(datum)
|
err := tiered.AppendSample(datum)
|
||||||
|
@ -417,3 +431,99 @@ func BenchmarkMakeView(b *testing.B) {
|
||||||
testMakeView(b)
|
testMakeView(b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetAllValuesForLabel(t *testing.T) {
|
||||||
|
type in struct {
|
||||||
|
metricName string
|
||||||
|
appendToMemory bool
|
||||||
|
appendToDisk bool
|
||||||
|
}
|
||||||
|
|
||||||
|
scenarios := []struct {
|
||||||
|
in []in
|
||||||
|
out []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// Empty case.
|
||||||
|
}, {
|
||||||
|
in: []in{
|
||||||
|
{
|
||||||
|
metricName: "request_count",
|
||||||
|
appendToMemory: false,
|
||||||
|
appendToDisk: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
out: []string{
|
||||||
|
"request_count",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
in: []in{
|
||||||
|
{
|
||||||
|
metricName: "request_count",
|
||||||
|
appendToMemory: true,
|
||||||
|
appendToDisk: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
metricName: "start_time",
|
||||||
|
appendToMemory: false,
|
||||||
|
appendToDisk: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
out: []string{
|
||||||
|
"request_count",
|
||||||
|
"start_time",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
in: []in{
|
||||||
|
{
|
||||||
|
metricName: "request_count",
|
||||||
|
appendToMemory: true,
|
||||||
|
appendToDisk: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
metricName: "start_time",
|
||||||
|
appendToMemory: true,
|
||||||
|
appendToDisk: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
out: []string{
|
||||||
|
"request_count",
|
||||||
|
"start_time",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, scenario := range scenarios {
|
||||||
|
tiered, closer := newTestTieredStorage(t)
|
||||||
|
defer closer.Close()
|
||||||
|
for j, metric := range scenario.in {
|
||||||
|
sample := model.Sample{
|
||||||
|
Metric: model.Metric{"name": model.LabelValue(metric.metricName)},
|
||||||
|
}
|
||||||
|
if metric.appendToMemory {
|
||||||
|
if err := tiered.(*tieredStorage).memoryArena.AppendSample(sample); err != nil {
|
||||||
|
t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metric.appendToDisk {
|
||||||
|
if err := tiered.(*tieredStorage).diskStorage.AppendSample(sample); err != nil {
|
||||||
|
t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metricNames, err := tiered.GetAllValuesForLabel("name")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%d. Error getting metric names: %s", i, err)
|
||||||
|
}
|
||||||
|
if len(metricNames) != len(scenario.out) {
|
||||||
|
t.Fatalf("%d. Expected metric count %d, got %d", i, len(scenario.out), len(metricNames))
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(metricNames)
|
||||||
|
for j, expected := range scenario.out {
|
||||||
|
if expected != string(metricNames[j]) {
|
||||||
|
t.Fatalf("%d.%d. Expected metric %s, got %s", i, j, expected, metricNames[j])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -86,7 +86,7 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st
|
||||||
}
|
}
|
||||||
|
|
||||||
func (serv MetricsService) Metrics() string {
|
func (serv MetricsService) Metrics() string {
|
||||||
metricNames, err := serv.appState.Storage.GetAllMetricNames()
|
metricNames, err := serv.appState.Storage.GetAllValuesForLabel("name")
|
||||||
rb := serv.ResponseBuilder()
|
rb := serv.ResponseBuilder()
|
||||||
rb.SetContentType(gorest.Application_Json)
|
rb.SetContentType(gorest.Application_Json)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -94,7 +94,7 @@ func (serv MetricsService) Metrics() string {
|
||||||
rb.SetResponseCode(http.StatusInternalServerError)
|
rb.SetResponseCode(http.StatusInternalServerError)
|
||||||
return err.Error()
|
return err.Error()
|
||||||
}
|
}
|
||||||
sort.Strings(metricNames)
|
sort.Sort(metricNames)
|
||||||
resultBytes, err := json.Marshal(metricNames)
|
resultBytes, err := json.Marshal(metricNames)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error marshalling metric names: %v", err)
|
log.Printf("Error marshalling metric names: %v", err)
|
||||||
|
|
Loading…
Reference in a new issue