Harden the tests against OOMs.

This commit employs explicit memory freeing for the in-memory storage
arenas.  Secondarily, we take advantage of smaller channel buffer sizes
in the test.
This commit is contained in:
Matt T. Proud 2013-04-29 11:17:56 +02:00
parent b8bc91d6c0
commit 6fac20c8af
5 changed files with 69 additions and 26 deletions

50
main.go
View file

@ -44,7 +44,34 @@ var (
memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.")
)
type prometheus struct {
storage metric.Storage
// TODO: Refactor channels to work with arrays of results for better chunking.
scrapeResults chan format.Result
ruleResults chan *rules.Result
}
func (p prometheus) interruptHandler() {
notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt)
<-notifier
log.Println("Received SIGINT; Exiting Gracefully...")
p.close()
os.Exit(0)
}
func (p prometheus) close() {
p.storage.Close()
close(p.scrapeResults)
close(p.ruleResults)
}
func main() {
// TODO(all): Future additions to main should be, where applicable, glumped
// into the prometheus struct above---at least where the scoping of the entire
// server is concerned.
flag.Parse()
versionInfoTmpl.Execute(os.Stdout, BuildInfo)
@ -62,23 +89,25 @@ func main() {
if err != nil {
log.Fatalf("Error opening storage: %s", err)
}
scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity)
ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity)
prometheus := prometheus{
storage: ts,
scrapeResults: scrapeResults,
ruleResults: ruleResults,
}
defer prometheus.close()
go ts.Serve()
go func() {
notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt)
<-notifier
ts.Close()
os.Exit(0)
}()
go prometheus.interruptHandler()
// Queue depth will need to be exposed
scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity)
targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance)
targetManager.AddTargetsFromConfig(conf)
ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity)
ast.SetStorage(ts)
ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval)
@ -97,6 +126,7 @@ func main() {
web.StartServing(appState)
// TODO(all): Migrate this into prometheus.serve().
for {
select {
case scrapeResult := <-scrapeResults:

View file

@ -49,7 +49,6 @@ func (l LabelSet) Merge(other LabelSet) LabelSet {
return result
}
func (l LabelSet) String() string {
var (
buffer bytes.Buffer

View file

@ -86,7 +86,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
}
// let the deadline lapse
time.Sleep(15*time.Millisecond)
time.Sleep(15 * time.Millisecond)
// now scrape again
signal <- true

View file

@ -26,6 +26,10 @@ var (
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC)
)
const (
appendQueueSize = 1000
)
func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) {
err := p.AppendSample(s)
if err != nil {
@ -86,7 +90,7 @@ func (t testTieredStorageCloser) Close() {
func NewTestTieredStorage(t test.Tester) (storage Storage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
if err != nil {
if storage != nil {

View file

@ -21,6 +21,7 @@ import (
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"log"
"sort"
"sync"
"time"
@ -93,7 +94,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
return
}
func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
func (t tieredStorage) AppendSample(s model.Sample) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
@ -103,12 +104,14 @@ func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
return
}
func (t *tieredStorage) Drain() {
func (t tieredStorage) Drain() {
log.Println("Starting drain...")
drainingDone := make(chan bool)
if len(t.draining) == 0 {
t.draining <- drainingDone
}
<-drainingDone
log.Println("Done.")
}
func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
@ -155,17 +158,17 @@ func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
return
}
return
}
func (t *tieredStorage) Serve() {
var (
flushMemoryTicker = time.Tick(t.flushMemoryInterval)
writeMemoryTicker = time.Tick(t.writeMemoryInterval)
reportTicker = time.NewTicker(time.Second)
)
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop()
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
defer writeMemoryTicker.Stop()
reportTicker := time.NewTicker(time.Second)
defer reportTicker.Stop()
go func() {
@ -176,9 +179,9 @@ func (t *tieredStorage) Serve() {
for {
select {
case <-writeMemoryTicker:
case <-writeMemoryTicker.C:
t.writeMemory()
case <-flushMemoryTicker:
case <-flushMemoryTicker.C:
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
@ -219,17 +222,24 @@ func (t *tieredStorage) writeMemory() {
}
}
func (t *tieredStorage) Flush() {
func (t tieredStorage) Flush() {
t.flush()
}
func (t *tieredStorage) Close() {
func (t tieredStorage) Close() {
log.Println("Closing tiered storage...")
t.Drain()
t.diskStorage.Close()
t.memoryArena.Close()
close(t.appendToDiskQueue)
close(t.appendToMemoryQueue)
close(t.viewQueue)
log.Println("Done.")
}
// Write all pending appends.
func (t *tieredStorage) flush() (err error) {
func (t tieredStorage) flush() (err error) {
// Trim any old values to reduce iterative write costs.
t.flushMemory()
t.writeMemory()