Publicize a few storage components for curation.

This commit introduces the publicization of Stop and other
components, which the compaction curator shall take advantage
of.
This commit is contained in:
Matt T. Proud 2013-05-02 12:49:13 +02:00
parent 4298bab2b0
commit a3f1d81e24
5 changed files with 20 additions and 20 deletions

View file

@ -60,10 +60,10 @@ type watermarkFilter struct {
// stored samples on-disk. This is useful to compact sparse sample values into // stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore. // single sample entities to reduce keyspace load on the datastore.
type Curator struct { type Curator struct {
// stop functions as a channel that when empty allows the curator to operate. // Stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain // The moment a value is ingested inside of it, the curator goes into drain
// mode. // mode.
stop chan bool Stop chan bool
} }
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles // watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
@ -142,7 +142,7 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
ignoreYoungerThan: ignoreYoungerThan, ignoreYoungerThan: ignoreYoungerThan,
processor: processor, processor: processor,
status: status, status: status,
stop: c.stop, stop: c.Stop,
stopAt: instant.Add(-1 * ignoreYoungerThan), stopAt: instant.Add(-1 * ignoreYoungerThan),
} }
@ -167,8 +167,8 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
// drain instructs the curator to stop at the next convenient moment as to not // drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies. // introduce data inconsistencies.
func (c Curator) Drain() { func (c Curator) Drain() {
if len(c.stop) == 0 { if len(c.Stop) == 0 {
c.stop <- true c.Stop <- true
} }
} }

View file

@ -37,13 +37,13 @@ var (
) )
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
curationRemarks *leveldb.LevelDBPersistence CurationRemarks *leveldb.LevelDBPersistence
fingerprintToMetrics *leveldb.LevelDBPersistence fingerprintToMetrics *leveldb.LevelDBPersistence
labelNameToFingerprints *leveldb.LevelDBPersistence labelNameToFingerprints *leveldb.LevelDBPersistence
labelSetToFingerprints *leveldb.LevelDBPersistence labelSetToFingerprints *leveldb.LevelDBPersistence
metricHighWatermarks *leveldb.LevelDBPersistence MetricHighWatermarks *leveldb.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex metricMembershipIndex *index.LevelDBMembershipIndex
metricSamples *leveldb.LevelDBPersistence MetricSamples *leveldb.LevelDBPersistence
} }
var ( var (
@ -65,13 +65,13 @@ type leveldbCloser interface {
func (l *LevelDBMetricPersistence) Close() { func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{ var persistences = []leveldbCloser{
l.curationRemarks, l.CurationRemarks,
l.fingerprintToMetrics, l.fingerprintToMetrics,
l.labelNameToFingerprints, l.labelNameToFingerprints,
l.labelSetToFingerprints, l.labelSetToFingerprints,
l.metricHighWatermarks, l.MetricHighWatermarks,
l.metricMembershipIndex, l.metricMembershipIndex,
l.metricSamples, l.MetricSamples,
} }
closerGroup := sync.WaitGroup{} closerGroup := sync.WaitGroup{}
@ -110,7 +110,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint", "Samples by Fingerprint",
func() { func() {
var err error var err error
emission.metricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10) emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -118,7 +118,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"High Watermarks by Fingerprint", "High Watermarks by Fingerprint",
func() { func() {
var err error var err error
emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10) emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -150,7 +150,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks", "Sample Curation Remarks",
func() { func() {
var err error var err error
emission.curationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10) emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -501,7 +501,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
keyEncoded := coding.NewProtocolBuffer(key) keyEncoded := coding.NewProtocolBuffer(key)
key.Signature = proto.String(fingerprint.ToRowKey()) key.Signature = proto.String(fingerprint.ToRowKey())
raw, err = l.metricHighWatermarks.Get(keyEncoded) raw, err = l.MetricHighWatermarks.Get(keyEncoded)
if err != nil { if err != nil {
return return
} }
@ -521,7 +521,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
mutationCount++ mutationCount++
} }
err = l.metricHighWatermarks.Commit(batch) err = l.MetricHighWatermarks.Commit(batch)
if err != nil { if err != nil {
return return
} }
@ -596,7 +596,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
} }
} }
err = l.metricSamples.Commit(samplesBatch) err = l.MetricSamples.Commit(samplesBatch)
if err != nil { if err != nil {
return return
} }

View file

@ -852,7 +852,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
defer close(stop) defer close(stop)
c := Curator{ c := Curator{
stop: stop, Stop: stop,
} }
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)

View file

@ -199,7 +199,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i
return return
} }
iterator := l.metricSamples.NewIterator(true) iterator := l.MetricSamples.NewIterator(true)
defer iterator.Close() defer iterator.Close()
for valid := iterator.Seek(e); valid; valid = iterator.Next() { for valid := iterator.Seek(e); valid; valid = iterator.Next() {

View file

@ -370,7 +370,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
scans = viewJob.builder.ScanJobs() scans = viewJob.builder.ScanJobs()
view = newView() view = newView()
// Get a single iterator that will be used for all data extraction below. // Get a single iterator that will be used for all data extraction below.
iterator = t.diskStorage.metricSamples.NewIterator(true) iterator = t.diskStorage.MetricSamples.NewIterator(true)
) )
defer iterator.Close() defer iterator.Close()