Merge pull request #100 from prometheus/julius-view-abortion

Abort view job processing on timeout.
This commit is contained in:
juliusv 2013-03-26 10:31:19 -07:00
commit 40cc435feb

View file

@ -46,6 +46,7 @@ type tieredStorage struct {
type viewJob struct { type viewJob struct {
builder ViewRequestBuilder builder ViewRequestBuilder
output chan View output chan View
abort chan bool
err chan error err chan error
} }
@ -115,11 +116,18 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return return
} }
result := make(chan View) // The result channel needs a one-element buffer in case we have timed out in
// MakeView, but the view rendering still completes afterwards and writes to
// the channel.
result := make(chan View, 1)
// The abort channel needs a one-element buffer in case the view rendering
// has already exited and doesn't consume from the channel anymore.
abortChan := make(chan bool, 1)
errChan := make(chan error) errChan := make(chan error)
t.viewQueue <- viewJob{ t.viewQueue <- viewJob{
builder: builder, builder: builder,
output: result, output: result,
abort: abortChan,
err: errChan, err: errChan,
} }
@ -129,6 +137,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
case err = <-errChan: case err = <-errChan:
return return
case <-time.After(deadline): case <-time.After(deadline):
abortChan <- true
err = fmt.Errorf("MakeView timed out after %s.", deadline) err = fmt.Errorf("MakeView timed out after %s.", deadline)
} }
@ -379,6 +388,11 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
standingOps := scanJob.operations standingOps := scanJob.operations
for len(standingOps) > 0 { for len(standingOps) > 0 {
// Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
// Load data value chunk(s) around the first standing op's current time. // Load data value chunk(s) around the first standing op's current time.
highWatermark := *standingOps[0].CurrentTime() highWatermark := *standingOps[0].CurrentTime()
// XXX: For earnest performance gains analagous to the benchmarking we // XXX: For earnest performance gains analagous to the benchmarking we