mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #210 from prometheus/refactor/storage/publicization
Publicize a few storage components for curation.
This commit is contained in:
commit
5a07e8a7c6
|
@ -60,10 +60,10 @@ type watermarkFilter struct {
|
|||
// stored samples on-disk. This is useful to compact sparse sample values into
|
||||
// single sample entities to reduce keyspace load on the datastore.
|
||||
type Curator struct {
|
||||
// stop functions as a channel that when empty allows the curator to operate.
|
||||
// Stop functions as a channel that when empty allows the curator to operate.
|
||||
// The moment a value is ingested inside of it, the curator goes into drain
|
||||
// mode.
|
||||
stop chan bool
|
||||
Stop chan bool
|
||||
}
|
||||
|
||||
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||||
|
@ -142,7 +142,7 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
|
|||
ignoreYoungerThan: ignoreYoungerThan,
|
||||
processor: processor,
|
||||
status: status,
|
||||
stop: c.stop,
|
||||
stop: c.Stop,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
}
|
||||
|
||||
|
@ -167,8 +167,8 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
|
|||
// drain instructs the curator to stop at the next convenient moment as to not
|
||||
// introduce data inconsistencies.
|
||||
func (c Curator) Drain() {
|
||||
if len(c.stop) == 0 {
|
||||
c.stop <- true
|
||||
if len(c.Stop) == 0 {
|
||||
c.Stop <- true
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,13 +37,13 @@ var (
|
|||
)
|
||||
|
||||
type LevelDBMetricPersistence struct {
|
||||
curationRemarks *leveldb.LevelDBPersistence
|
||||
CurationRemarks *leveldb.LevelDBPersistence
|
||||
fingerprintToMetrics *leveldb.LevelDBPersistence
|
||||
labelNameToFingerprints *leveldb.LevelDBPersistence
|
||||
labelSetToFingerprints *leveldb.LevelDBPersistence
|
||||
metricHighWatermarks *leveldb.LevelDBPersistence
|
||||
MetricHighWatermarks *leveldb.LevelDBPersistence
|
||||
metricMembershipIndex *index.LevelDBMembershipIndex
|
||||
metricSamples *leveldb.LevelDBPersistence
|
||||
MetricSamples *leveldb.LevelDBPersistence
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -65,13 +65,13 @@ type leveldbCloser interface {
|
|||
|
||||
func (l *LevelDBMetricPersistence) Close() {
|
||||
var persistences = []leveldbCloser{
|
||||
l.curationRemarks,
|
||||
l.CurationRemarks,
|
||||
l.fingerprintToMetrics,
|
||||
l.labelNameToFingerprints,
|
||||
l.labelSetToFingerprints,
|
||||
l.metricHighWatermarks,
|
||||
l.MetricHighWatermarks,
|
||||
l.metricMembershipIndex,
|
||||
l.metricSamples,
|
||||
l.MetricSamples,
|
||||
}
|
||||
|
||||
closerGroup := sync.WaitGroup{}
|
||||
|
@ -110,7 +110,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
|
|||
"Samples by Fingerprint",
|
||||
func() {
|
||||
var err error
|
||||
emission.metricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
|
||||
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
|
||||
workers.MayFail(err)
|
||||
},
|
||||
},
|
||||
|
@ -118,7 +118,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
|
|||
"High Watermarks by Fingerprint",
|
||||
func() {
|
||||
var err error
|
||||
emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
|
||||
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
|
||||
workers.MayFail(err)
|
||||
},
|
||||
},
|
||||
|
@ -150,7 +150,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
|
|||
"Sample Curation Remarks",
|
||||
func() {
|
||||
var err error
|
||||
emission.curationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
|
||||
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
|
||||
workers.MayFail(err)
|
||||
},
|
||||
},
|
||||
|
@ -501,7 +501,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|||
keyEncoded := coding.NewProtocolBuffer(key)
|
||||
|
||||
key.Signature = proto.String(fingerprint.ToRowKey())
|
||||
raw, err = l.metricHighWatermarks.Get(keyEncoded)
|
||||
raw, err = l.MetricHighWatermarks.Get(keyEncoded)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -521,7 +521,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|||
mutationCount++
|
||||
}
|
||||
|
||||
err = l.metricHighWatermarks.Commit(batch)
|
||||
err = l.MetricHighWatermarks.Commit(batch)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -596,7 +596,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
}
|
||||
}
|
||||
|
||||
err = l.metricSamples.Commit(samplesBatch)
|
||||
err = l.MetricSamples.Commit(samplesBatch)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -852,7 +852,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
defer close(stop)
|
||||
|
||||
c := Curator{
|
||||
stop: stop,
|
||||
Stop: stop,
|
||||
}
|
||||
|
||||
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
|
||||
|
|
|
@ -199,7 +199,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
|
|||
return
|
||||
}
|
||||
|
||||
iterator := l.metricSamples.NewIterator(true)
|
||||
iterator := l.MetricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
|
||||
|
|
|
@ -370,7 +370,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
scans = viewJob.builder.ScanJobs()
|
||||
view = newView()
|
||||
// Get a single iterator that will be used for all data extraction below.
|
||||
iterator = t.diskStorage.metricSamples.NewIterator(true)
|
||||
iterator = t.diskStorage.MetricSamples.NewIterator(true)
|
||||
)
|
||||
defer iterator.Close()
|
||||
|
||||
|
|
Loading…
Reference in a new issue