Introduce 'primary' storage in fanout, and have Add return the ref from the primary.

Also, ensure all append batches are rolled back when a commit or rollback fails.
This commit is contained in:
Tom Wilkie 2017-07-12 15:50:26 +01:00
parent db8128ceeb
commit 96e25adc8d

View file

@ -17,26 +17,38 @@ import (
"container/heap"
"strings"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
)
type fanout struct {
storages []Storage
primary Storage
secondaries []Storage
}
// NewFanout returns a new fan-out Storage, which proxies reads and writes
// through to multiple underlying storages.
func NewFanout(storages ...Storage) Storage {
func NewFanout(primary Storage, secondaries ...Storage) Storage {
return &fanout{
storages: storages,
primary: primary,
secondaries: secondaries,
}
}
func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
queriers := mergeQuerier{
queriers: make([]Querier, 0, len(f.storages)),
queriers: make([]Querier, 0, 1+len(f.secondaries)),
}
for _, storage := range f.storages {
// Add primary querier
querier, err := f.primary.Querier(mint, maxt)
if err != nil {
return nil, err
}
queriers.queriers = append(queriers.queriers, querier)
// Add secondary queriers
for _, storage := range f.secondaries {
querier, err := storage.Querier(mint, maxt)
if err != nil {
queriers.Close()
@ -48,24 +60,34 @@ func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
}
func (f *fanout) Appender() (Appender, error) {
appenders := make([]Appender, 0, len(f.storages))
for _, storage := range f.storages {
primary, err := f.primary.Appender()
if err != nil {
return nil, err
}
secondaries := make([]Appender, 0, len(f.secondaries))
for _, storage := range f.secondaries {
appender, err := storage.Appender()
if err != nil {
return nil, err
}
appenders = append(appenders, appender)
secondaries = append(secondaries, appender)
}
return &fanoutAppender{
appenders: appenders,
primary: primary,
secondaries: secondaries,
}, nil
}
// Close closes the storage and all its underlying resources.
func (f *fanout) Close() error {
if err := f.primary.Close(); err != nil {
return err
}
// TODO return multiple errors?
var lastErr error
for _, storage := range f.storages {
for _, storage := range f.secondaries {
if err := storage.Close(); err != nil {
lastErr = err
}
@ -75,40 +97,61 @@ func (f *fanout) Close() error {
// fanoutAppender implements Appender.
type fanoutAppender struct {
appenders []Appender
primary Appender
secondaries []Appender
}
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) {
for _, appender := range f.appenders {
ref, err := f.primary.Add(l, t, v)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.Add(l, t, v); err != nil {
return "", err
}
}
return "", nil
return ref, nil
}
func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error {
for _, appender := range f.appenders {
if err := appender.AddFast(l, ref, t, v); err != nil {
if err := f.primary.AddFast(l, ref, t, v); err != nil {
return err
}
for _, appender := range f.secondaries {
if _, err := appender.Add(l, t, v); err != nil {
return err
}
}
return nil
}
func (f *fanoutAppender) Commit() error {
for _, appender := range f.appenders {
if err := appender.Commit(); err != nil {
return err
func (f *fanoutAppender) Commit() (err error) {
err = f.primary.Commit()
for _, appender := range f.secondaries {
if err == nil {
err = appender.Commit()
} else {
if rollbackErr := appender.Rollback(); rollbackErr != nil {
log.Errorf("Squashed rollback error on commit: %v", rollbackErr)
}
}
}
return nil
return
}
func (f *fanoutAppender) Rollback() error {
for _, appender := range f.appenders {
if err := appender.Rollback(); err != nil {
return err
func (f *fanoutAppender) Rollback() (err error) {
err = f.primary.Rollback()
for _, appender := range f.secondaries {
rollbackErr := appender.Rollback()
if err == nil {
err = rollbackErr
} else if rollbackErr != nil {
log.Errorf("Squashed rollback error on rollback: %v", rollbackErr)
}
}
return nil