mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Remove race conditions from UncertaintyGroup
This commit is contained in:
parent
7f548a4297
commit
b886a14cfc
|
@ -15,6 +15,7 @@ package utility
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type state int
|
type state int
|
||||||
|
@ -62,18 +63,19 @@ type uncertaintyGroup struct {
|
||||||
successes uint
|
successes uint
|
||||||
results chan error
|
results chan error
|
||||||
anomalies []error
|
anomalies []error
|
||||||
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g uncertaintyGroup) Succeed() {
|
func (g *uncertaintyGroup) Succeed() {
|
||||||
if g.state == finished {
|
if g.isFinished() {
|
||||||
panic("cannot remark when done")
|
panic("cannot remark when done")
|
||||||
}
|
}
|
||||||
|
|
||||||
g.results <- nil
|
g.results <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g uncertaintyGroup) Fail(err error) {
|
func (g *uncertaintyGroup) Fail(err error) {
|
||||||
if g.state == finished {
|
if g.isFinished() {
|
||||||
panic("cannot remark when done")
|
panic("cannot remark when done")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,22 +86,42 @@ func (g uncertaintyGroup) Fail(err error) {
|
||||||
g.results <- err
|
g.results <- err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g uncertaintyGroup) MayFail(err error) {
|
func (g *uncertaintyGroup) MayFail(err error) {
|
||||||
if g.state == finished {
|
if g.isFinished() {
|
||||||
panic("cannot remark when done")
|
panic("cannot remark when done")
|
||||||
}
|
}
|
||||||
|
|
||||||
g.results <- err
|
g.results <- err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *uncertaintyGroup) Wait() bool {
|
func (g *uncertaintyGroup) isFinished() bool {
|
||||||
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
|
return g.state == finished
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *uncertaintyGroup) finish() {
|
||||||
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
|
g.state = finished
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *uncertaintyGroup) start() {
|
||||||
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
if g.state != unstarted {
|
if g.state != unstarted {
|
||||||
panic("cannot restart")
|
panic("cannot restart")
|
||||||
}
|
}
|
||||||
|
|
||||||
defer close(g.results)
|
|
||||||
|
|
||||||
g.state = started
|
g.state = started
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *uncertaintyGroup) Wait() bool {
|
||||||
|
defer close(g.results)
|
||||||
|
g.start()
|
||||||
|
|
||||||
for g.remaining > 0 {
|
for g.remaining > 0 {
|
||||||
result := <-g.results
|
result := <-g.results
|
||||||
|
@ -113,12 +135,12 @@ func (g *uncertaintyGroup) Wait() bool {
|
||||||
g.remaining--
|
g.remaining--
|
||||||
}
|
}
|
||||||
|
|
||||||
g.state = finished
|
g.finish()
|
||||||
|
|
||||||
return len(g.anomalies) == 0
|
return len(g.anomalies) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g uncertaintyGroup) Errors() []error {
|
func (g *uncertaintyGroup) Errors() []error {
|
||||||
if g.state != finished {
|
if g.state != finished {
|
||||||
panic("cannot provide errors until finished")
|
panic("cannot provide errors until finished")
|
||||||
}
|
}
|
||||||
|
@ -126,7 +148,7 @@ func (g uncertaintyGroup) Errors() []error {
|
||||||
return g.anomalies
|
return g.anomalies
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g uncertaintyGroup) String() string {
|
func (g *uncertaintyGroup) String() string {
|
||||||
return fmt.Sprintf("UncertaintyGroup %s with %s failures", g.state, g.anomalies)
|
return fmt.Sprintf("UncertaintyGroup %s with %s failures", g.state, g.anomalies)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue