mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Merge pull request #223 from prometheus/feature/refactor/storage-exposition
Expose TieredStorage.DiskStorage.
This commit is contained in:
commit
76dad4307d
|
@ -30,10 +30,12 @@ import (
|
||||||
// TieredStorage both persists samples and generates materialized views for
|
// TieredStorage both persists samples and generates materialized views for
|
||||||
// queries.
|
// queries.
|
||||||
type TieredStorage struct {
|
type TieredStorage struct {
|
||||||
|
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
|
||||||
|
DiskStorage *LevelDBMetricPersistence
|
||||||
|
|
||||||
appendToDiskQueue chan model.Samples
|
appendToDiskQueue chan model.Samples
|
||||||
appendToMemoryQueue chan model.Samples
|
appendToMemoryQueue chan model.Samples
|
||||||
diskFrontier *diskFrontier
|
diskFrontier *diskFrontier
|
||||||
diskStorage *LevelDBMetricPersistence
|
|
||||||
draining chan chan bool
|
draining chan chan bool
|
||||||
flushMemoryInterval time.Duration
|
flushMemoryInterval time.Duration
|
||||||
memoryArena memorySeriesStorage
|
memoryArena memorySeriesStorage
|
||||||
|
@ -60,7 +62,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
|
||||||
storage = &TieredStorage{
|
storage = &TieredStorage{
|
||||||
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
|
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
|
||||||
appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth),
|
appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth),
|
||||||
diskStorage: diskStorage,
|
DiskStorage: diskStorage,
|
||||||
draining: make(chan chan bool),
|
draining: make(chan chan bool),
|
||||||
flushMemoryInterval: flushMemoryInterval,
|
flushMemoryInterval: flushMemoryInterval,
|
||||||
memoryArena: NewMemorySeriesStorage(),
|
memoryArena: NewMemorySeriesStorage(),
|
||||||
|
@ -210,7 +212,7 @@ func (t TieredStorage) Flush() {
|
||||||
func (t TieredStorage) Close() {
|
func (t TieredStorage) Close() {
|
||||||
log.Println("Closing tiered storage...")
|
log.Println("Closing tiered storage...")
|
||||||
t.Drain()
|
t.Drain()
|
||||||
t.diskStorage.Close()
|
t.DiskStorage.Close()
|
||||||
t.memoryArena.Close()
|
t.memoryArena.Close()
|
||||||
|
|
||||||
close(t.appendToDiskQueue)
|
close(t.appendToDiskQueue)
|
||||||
|
@ -323,7 +325,7 @@ func (t *TieredStorage) flushMemory() {
|
||||||
defer t.mutex.Unlock()
|
defer t.mutex.Unlock()
|
||||||
|
|
||||||
flusher := &memoryToDiskFlusher{
|
flusher := &memoryToDiskFlusher{
|
||||||
disk: t.diskStorage,
|
disk: t.DiskStorage,
|
||||||
olderThan: time.Now().Add(-1 * t.memoryTTL),
|
olderThan: time.Now().Add(-1 * t.memoryTTL),
|
||||||
toDiskQueue: t.appendToDiskQueue,
|
toDiskQueue: t.appendToDiskQueue,
|
||||||
}
|
}
|
||||||
|
@ -351,7 +353,7 @@ func (t TieredStorage) renderView(viewJob viewJob) {
|
||||||
scans = viewJob.builder.ScanJobs()
|
scans = viewJob.builder.ScanJobs()
|
||||||
view = newView()
|
view = newView()
|
||||||
// Get a single iterator that will be used for all data extraction below.
|
// Get a single iterator that will be used for all data extraction below.
|
||||||
iterator = t.diskStorage.MetricSamples.NewIterator(true)
|
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
|
||||||
)
|
)
|
||||||
defer iterator.Close()
|
defer iterator.Close()
|
||||||
|
|
||||||
|
@ -541,7 +543,7 @@ func (t TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *
|
||||||
|
|
||||||
// Get all label values that are associated with the provided label name.
|
// Get all label values that are associated with the provided label name.
|
||||||
func (t TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
func (t TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||||
diskValues, err := t.diskStorage.GetAllValuesForLabel(labelName)
|
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -568,7 +570,7 @@ func (t TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fing
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
diskFingerprints, err := t.diskStorage.GetFingerprintsForLabelSet(labelSet)
|
diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -590,7 +592,7 @@ func (t TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Me
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if m == nil {
|
if m == nil {
|
||||||
m, err = t.diskStorage.GetMetricForFingerprint(f)
|
m, err = t.DiskStorage.GetMetricForFingerprint(f)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -485,7 +485,7 @@ func TestGetAllValuesForLabel(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if metric.appendToDisk {
|
if metric.appendToDisk {
|
||||||
if err := tiered.diskStorage.AppendSample(sample); err != nil {
|
if err := tiered.DiskStorage.AppendSample(sample); err != nil {
|
||||||
t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err)
|
t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -520,7 +520,7 @@ func TestGetFingerprintsForLabelSet(t *testing.T) {
|
||||||
if err := tiered.memoryArena.AppendSample(memorySample); err != nil {
|
if err := tiered.memoryArena.AppendSample(memorySample); err != nil {
|
||||||
t.Fatalf("Failed to add fixture data: %s", err)
|
t.Fatalf("Failed to add fixture data: %s", err)
|
||||||
}
|
}
|
||||||
if err := tiered.diskStorage.AppendSample(diskSample); err != nil {
|
if err := tiered.DiskStorage.AppendSample(diskSample); err != nil {
|
||||||
t.Fatalf("Failed to add fixture data: %s", err)
|
t.Fatalf("Failed to add fixture data: %s", err)
|
||||||
}
|
}
|
||||||
tiered.Flush()
|
tiered.Flush()
|
||||||
|
|
Loading…
Reference in a new issue