mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
Merge pull request #120 from prometheus/feature/storage/compaction
Spin up curator run in the tests.
This commit is contained in:
commit
d79c932a8e
|
@ -15,13 +15,14 @@ package coding
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"code.google.com/p/goprotobuf/proto"
|
"code.google.com/p/goprotobuf/proto"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProtocolBufferEncoder struct {
|
type ProtocolBuffer struct {
|
||||||
message proto.Message
|
message proto.Message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) {
|
func (p ProtocolBuffer) Encode() (raw []byte, err error) {
|
||||||
raw, err = proto.Marshal(p.message)
|
raw, err = proto.Marshal(p.message)
|
||||||
|
|
||||||
// XXX: Adjust legacy users of this to not check for error.
|
// XXX: Adjust legacy users of this to not check for error.
|
||||||
|
@ -32,8 +33,12 @@ func (p *ProtocolBufferEncoder) Encode() (raw []byte, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProtocolBufferEncoder(message proto.Message) *ProtocolBufferEncoder {
|
func (p ProtocolBuffer) String() string {
|
||||||
return &ProtocolBufferEncoder{
|
return fmt.Sprintf("ProtocolBufferEncoder of %s", p.message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProtocolBuffer(message proto.Message) *ProtocolBuffer {
|
||||||
|
return &ProtocolBuffer{
|
||||||
message: message,
|
message: message,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
1
storage/metric/.gitignore
vendored
Normal file
1
storage/metric/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
command-line-arguments.test
|
|
@ -37,8 +37,9 @@ type curator struct {
|
||||||
// watermarks is the on-disk store that is scanned for high watermarks for
|
// watermarks is the on-disk store that is scanned for high watermarks for
|
||||||
// given metrics.
|
// given metrics.
|
||||||
watermarks raw.Persistence
|
watermarks raw.Persistence
|
||||||
// cutOff represents the most recent time up to which values will be curated.
|
// recencyThreshold represents the most recent time up to which values will be
|
||||||
cutOff time.Time
|
// curated.
|
||||||
|
recencyThreshold time.Duration
|
||||||
// groupingQuantity represents the number of samples below which encountered
|
// groupingQuantity represents the number of samples below which encountered
|
||||||
// samples will be dismembered and reaggregated into larger groups.
|
// samples will be dismembered and reaggregated into larger groups.
|
||||||
groupingQuantity uint32
|
groupingQuantity uint32
|
||||||
|
@ -48,9 +49,9 @@ type curator struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newCurator builds a new curator for the given LevelDB databases.
|
// newCurator builds a new curator for the given LevelDB databases.
|
||||||
func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
|
func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
|
||||||
return curator{
|
return curator{
|
||||||
cutOff: cutOff,
|
recencyThreshold: recencyThreshold,
|
||||||
stop: make(chan bool),
|
stop: make(chan bool),
|
||||||
samples: samples,
|
samples: samples,
|
||||||
curationState: curationState,
|
curationState: curationState,
|
||||||
|
@ -60,19 +61,19 @@ func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, sample
|
||||||
}
|
}
|
||||||
|
|
||||||
// run facilitates the curation lifecycle.
|
// run facilitates the curation lifecycle.
|
||||||
func (c curator) run() (err error) {
|
func (c curator) run(instant time.Time) (err error) {
|
||||||
var (
|
decoder := watermarkDecoder{}
|
||||||
decoder watermarkDecoder
|
filter := watermarkFilter{
|
||||||
filter = watermarkFilter{
|
|
||||||
stop: c.stop,
|
stop: c.stop,
|
||||||
curationState: c.curationState,
|
curationState: c.curationState,
|
||||||
|
groupSize: c.groupingQuantity,
|
||||||
|
recencyThreshold: c.recencyThreshold,
|
||||||
}
|
}
|
||||||
operator = watermarkOperator{
|
operator := watermarkOperator{
|
||||||
olderThan: c.cutOff,
|
olderThan: instant.Add(-1 * c.recencyThreshold),
|
||||||
groupSize: c.groupingQuantity,
|
groupSize: c.groupingQuantity,
|
||||||
curationState: c.curationState,
|
curationState: c.curationState,
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
|
||||||
_, err = c.watermarks.ForEach(decoder, filter, operator)
|
_, err = c.watermarks.ForEach(decoder, filter, operator)
|
||||||
|
|
||||||
|
@ -126,24 +127,28 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro
|
||||||
// watermarkFilter determines whether to include or exclude candidate
|
// watermarkFilter determines whether to include or exclude candidate
|
||||||
// values from the curation process by virtue of how old the high watermark is.
|
// values from the curation process by virtue of how old the high watermark is.
|
||||||
type watermarkFilter struct {
|
type watermarkFilter struct {
|
||||||
// curationState is the table of CurationKey to CurationValues that remark on
|
// curationState is the table of CurationKey to CurationValues that rema
|
||||||
// far along the curation process has gone for a given metric fingerprint.
|
// far along the curation process has gone for a given metric fingerprint.
|
||||||
curationState raw.Persistence
|
curationState raw.Persistence
|
||||||
// stop, when non-empty, instructs the filter to stop operation.
|
// stop, when non-empty, instructs the filter to stop operation.
|
||||||
stop chan bool
|
stop chan bool
|
||||||
|
// groupSize refers to the target groupSize from the curator.
|
||||||
|
groupSize uint32
|
||||||
|
// recencyThreshold refers to the target recencyThreshold from the curator.
|
||||||
|
recencyThreshold time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
|
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
|
||||||
var (
|
fingerprint := key.(model.Fingerprint)
|
||||||
fingerprint = key.(model.Fingerprint)
|
watermark := value.(model.Watermark)
|
||||||
watermark = value.(model.Watermark)
|
curationKey := &dto.CurationKey{
|
||||||
curationKey = fingerprint.ToDTO()
|
Fingerprint: fingerprint.ToDTO(),
|
||||||
rawCurationValue []byte
|
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||||||
err error
|
OlderThan: proto.Int64(int64(w.recencyThreshold)),
|
||||||
curationValue = &dto.CurationValue{}
|
}
|
||||||
)
|
curationValue := &dto.CurationValue{}
|
||||||
|
|
||||||
rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
|
rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -229,7 +234,7 @@ func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, er
|
||||||
MinimumGroupSize: proto.Uint32(w.groupSize),
|
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey))
|
curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey))
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -247,7 +252,7 @@ func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark mod
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
|
rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ type (
|
||||||
curationState struct {
|
curationState struct {
|
||||||
fingerprint string
|
fingerprint string
|
||||||
groupSize int
|
groupSize int
|
||||||
olderThan time.Duration
|
recencyThreshold time.Duration
|
||||||
lastCurated time.Time
|
lastCurated time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,21 +48,23 @@ type (
|
||||||
values []sample
|
values []sample
|
||||||
}
|
}
|
||||||
|
|
||||||
context struct {
|
in struct {
|
||||||
curationStates fixture.Pairs
|
curationStates fixture.Pairs
|
||||||
watermarkStates fixture.Pairs
|
watermarkStates fixture.Pairs
|
||||||
sampleGroups fixture.Pairs
|
sampleGroups fixture.Pairs
|
||||||
|
recencyThreshold time.Duration
|
||||||
|
groupSize uint32
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c curationState) Get() (key, value coding.Encoder) {
|
func (c curationState) Get() (key, value coding.Encoder) {
|
||||||
key = coding.NewProtocolBufferEncoder(&dto.CurationKey{
|
key = coding.NewProtocolBuffer(&dto.CurationKey{
|
||||||
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
|
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
|
||||||
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
|
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
|
||||||
OlderThan: proto.Int64(int64(c.olderThan)),
|
OlderThan: proto.Int64(int64(c.recencyThreshold)),
|
||||||
})
|
})
|
||||||
|
|
||||||
value = coding.NewProtocolBufferEncoder(&dto.CurationValue{
|
value = coding.NewProtocolBuffer(&dto.CurationValue{
|
||||||
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
|
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -70,13 +72,13 @@ func (c curationState) Get() (key, value coding.Encoder) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w watermarkState) Get() (key, value coding.Encoder) {
|
func (w watermarkState) Get() (key, value coding.Encoder) {
|
||||||
key = coding.NewProtocolBufferEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
||||||
value = coding.NewProtocolBufferEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s sampleGroup) Get() (key, value coding.Encoder) {
|
func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||||
key = coding.NewProtocolBufferEncoder(&dto.SampleKey{
|
key = coding.NewProtocolBuffer(&dto.SampleKey{
|
||||||
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
|
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
|
||||||
Timestamp: indexable.EncodeTime(s.values[0].time),
|
Timestamp: indexable.EncodeTime(s.values[0].time),
|
||||||
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
|
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
|
||||||
|
@ -92,7 +94,7 @@ func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
value = coding.NewProtocolBufferEncoder(series)
|
value = coding.NewProtocolBuffer(series)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -100,21 +102,30 @@ func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||||
func TestCurator(t *testing.T) {
|
func TestCurator(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
scenarios = []struct {
|
scenarios = []struct {
|
||||||
context context
|
in in
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
context: context{
|
in: in{
|
||||||
|
recencyThreshold: 1 * time.Hour,
|
||||||
|
groupSize: 5,
|
||||||
curationStates: fixture.Pairs{
|
curationStates: fixture.Pairs{
|
||||||
curationState{
|
curationState{
|
||||||
fingerprint: "0001-A-1-Z",
|
fingerprint: "0001-A-1-Z",
|
||||||
groupSize: 5,
|
groupSize: 5,
|
||||||
olderThan: 1 * time.Hour,
|
recencyThreshold: 1 * time.Hour,
|
||||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||||
},
|
},
|
||||||
curationState{
|
curationState{
|
||||||
fingerprint: "0002-A-2-Z",
|
fingerprint: "0002-A-2-Z",
|
||||||
groupSize: 5,
|
groupSize: 5,
|
||||||
olderThan: 1 * time.Hour,
|
recencyThreshold: 1 * time.Hour,
|
||||||
|
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||||
|
},
|
||||||
|
// This rule should effectively be ignored.
|
||||||
|
curationState{
|
||||||
|
fingerprint: "0002-A-2-Z",
|
||||||
|
groupSize: 2,
|
||||||
|
recencyThreshold: 30 * time.Minute,
|
||||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -124,7 +135,7 @@ func TestCurator(t *testing.T) {
|
||||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||||
},
|
},
|
||||||
watermarkState{
|
watermarkState{
|
||||||
fingerprint: "0002-A-1-Z",
|
fingerprint: "0002-A-2-Z",
|
||||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -479,26 +490,26 @@ func TestCurator(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, scenario := range scenarios {
|
for _, scenario := range scenarios {
|
||||||
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.context.curationStates))
|
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
|
||||||
defer curatorDirectory.Close()
|
defer curatorDirectory.Close()
|
||||||
|
|
||||||
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.context.watermarkStates))
|
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
|
||||||
defer watermarkDirectory.Close()
|
defer watermarkDirectory.Close()
|
||||||
|
|
||||||
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.context.sampleGroups))
|
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
|
||||||
defer sampleDirectory.Close()
|
defer sampleDirectory.Close()
|
||||||
|
|
||||||
curatorState, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
|
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer curatorState.Close()
|
defer curatorStates.Close()
|
||||||
|
|
||||||
watermarkState, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
|
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer watermarkState.Close()
|
defer watermarkStates.Close()
|
||||||
|
|
||||||
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
|
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -506,5 +517,7 @@ func TestCurator(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer samples.Close()
|
defer samples.Close()
|
||||||
|
|
||||||
|
c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates)
|
||||||
|
c.run(testInstant)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
|
||||||
Timestamp: upperSeek,
|
Timestamp: upperSeek,
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := coding.NewProtocolBufferEncoder(key).Encode()
|
raw, err := coding.NewProtocolBuffer(key).Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -151,7 +151,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
|
||||||
|
|
||||||
key.Timestamp = lowerSeek
|
key.Timestamp = lowerSeek
|
||||||
|
|
||||||
raw, err = coding.NewProtocolBufferEncoder(key).Encode()
|
raw, err = coding.NewProtocolBuffer(key).Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -339,7 +339,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
||||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||||
}
|
}
|
||||||
|
|
||||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = l.labelNameToFingerprints.Commit(batch)
|
err = l.labelNameToFingerprints.Commit(batch)
|
||||||
|
@ -414,7 +414,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
||||||
value.Member = append(value.Member, fingerprint.ToDTO())
|
value.Member = append(value.Member, fingerprint.ToDTO())
|
||||||
}
|
}
|
||||||
|
|
||||||
batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
batch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = l.labelSetToFingerprints.Commit(batch)
|
err = l.labelSetToFingerprints.Commit(batch)
|
||||||
|
@ -442,8 +442,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
|
||||||
defer batch.Close()
|
defer batch.Close()
|
||||||
|
|
||||||
for fingerprint, metric := range metrics {
|
for fingerprint, metric := range metrics {
|
||||||
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
|
key := coding.NewProtocolBuffer(fingerprint.ToDTO())
|
||||||
value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
|
value := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||||
batch.Put(key, value)
|
batch.Put(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -528,7 +528,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
||||||
|
|
||||||
// WART: We should probably encode simple fingerprints.
|
// WART: We should probably encode simple fingerprints.
|
||||||
for _, metric := range absentMetrics {
|
for _, metric := range absentMetrics {
|
||||||
key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
|
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||||
batch.Put(key, key)
|
batch.Put(key, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,7 +563,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
||||||
value = &dto.MetricHighWatermark{}
|
value = &dto.MetricHighWatermark{}
|
||||||
raw []byte
|
raw []byte
|
||||||
newestSampleTimestamp = samples[len(samples)-1].Timestamp
|
newestSampleTimestamp = samples[len(samples)-1].Timestamp
|
||||||
keyEncoded = coding.NewProtocolBufferEncoder(key)
|
keyEncoded = coding.NewProtocolBuffer(key)
|
||||||
)
|
)
|
||||||
|
|
||||||
key.Signature = proto.String(fingerprint.ToRowKey())
|
key.Signature = proto.String(fingerprint.ToRowKey())
|
||||||
|
@ -585,7 +585,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
||||||
batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value))
|
batch.Put(keyEncoded, coding.NewProtocolBuffer(value))
|
||||||
mutationCount++
|
mutationCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,7 +661,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
|
samplesBatch.Put(coding.NewProtocolBuffer(key), coding.NewProtocolBuffer(value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -752,7 +752,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
|
||||||
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
dtoKey := coding.NewProtocolBufferEncoder(dto)
|
dtoKey := coding.NewProtocolBuffer(dto)
|
||||||
value, err = l.metricMembershipIndex.Has(dtoKey)
|
value, err = l.metricMembershipIndex.Has(dtoKey)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -767,7 +767,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
|
||||||
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
dtoKey := coding.NewProtocolBufferEncoder(dto)
|
dtoKey := coding.NewProtocolBuffer(dto)
|
||||||
value, err = l.labelSetToFingerprints.Has(dtoKey)
|
value, err = l.labelSetToFingerprints.Has(dtoKey)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -782,7 +782,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
|
||||||
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
dtoKey := coding.NewProtocolBufferEncoder(dto)
|
dtoKey := coding.NewProtocolBuffer(dto)
|
||||||
value, err = l.labelNameToFingerprints.Has(dtoKey)
|
value, err = l.labelNameToFingerprints.Has(dtoKey)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -800,7 +800,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
|
||||||
sets := []utility.Set{}
|
sets := []utility.Set{}
|
||||||
|
|
||||||
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
|
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
|
||||||
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO))
|
f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBuffer(labelSetDTO))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fps, err
|
return fps, err
|
||||||
}
|
}
|
||||||
|
@ -847,7 +847,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
|
||||||
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(&labelName)))
|
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -876,7 +876,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
|
||||||
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f)))
|
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -958,7 +958,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
|
||||||
Timestamp: indexable.EncodeTime(t),
|
Timestamp: indexable.EncodeTime(t),
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := coding.NewProtocolBufferEncoder(k).Encode()
|
e, err := coding.NewProtocolBuffer(k).Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1161,7 +1161,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.
|
||||||
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := coding.NewProtocolBufferEncoder(k).Encode()
|
e, err := coding.NewProtocolBuffer(k).Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -469,7 +469,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try seeking to target key.
|
// Try seeking to target key.
|
||||||
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()
|
rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode()
|
||||||
iterator.Seek(rawKey)
|
iterator.Seek(rawKey)
|
||||||
|
|
||||||
foundKey, err := extractSampleKey(iterator)
|
foundKey, err := extractSampleKey(iterator)
|
||||||
|
|
|
@ -21,7 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
existenceValue = coding.NewProtocolBufferEncoder(&dto.MembershipIndexValue{})
|
existenceValue = coding.NewProtocolBuffer(&dto.MembershipIndexValue{})
|
||||||
)
|
)
|
||||||
|
|
||||||
type LevelDBMembershipIndex struct {
|
type LevelDBMembershipIndex struct {
|
||||||
|
|
|
@ -15,7 +15,6 @@ package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/prometheus/prometheus/coding"
|
"github.com/prometheus/prometheus/coding"
|
||||||
"github.com/prometheus/prometheus/storage/raw"
|
|
||||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||||
"github.com/prometheus/prometheus/utility/test"
|
"github.com/prometheus/prometheus/utility/test"
|
||||||
)
|
)
|
||||||
|
@ -64,13 +63,7 @@ type (
|
||||||
|
|
||||||
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
|
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
|
||||||
t = test.NewTemporaryDirectory(n, p.tester)
|
t = test.NewTemporaryDirectory(n, p.tester)
|
||||||
|
persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded)
|
||||||
var (
|
|
||||||
persistence raw.Persistence
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
persistence, err = leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
defer t.Close()
|
defer t.Close()
|
||||||
p.tester.Fatal(err)
|
p.tester.Fatal(err)
|
||||||
|
@ -83,12 +76,7 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for f.HasNext() {
|
for f.HasNext() {
|
||||||
var (
|
key, value := f.Next()
|
||||||
key coding.Encoder
|
|
||||||
value coding.Encoder
|
|
||||||
)
|
|
||||||
|
|
||||||
key, value = f.Next()
|
|
||||||
|
|
||||||
err = persistence.Put(key, value)
|
err = persistence.Put(key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue