From 96e25adc8dd76466ce94559f57507013638b2ab9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 12 Jul 2017 15:50:26 +0100 Subject: [PATCH] Introduce 'primary' storage in fanout, and have Add return the ref from the primary. Also, ensure all append batches are rolled back when a commit or rollback fails. --- storage/fanout.go | 91 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index 9572f96d4..c30908727 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -17,26 +17,38 @@ import ( "container/heap" "strings" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/pkg/labels" ) type fanout struct { - storages []Storage + primary Storage + secondaries []Storage } // NewFanout returns a new fan-out Storage, which proxies reads and writes // through to multiple underlying storages. -func NewFanout(storages ...Storage) Storage { +func NewFanout(primary Storage, secondaries ...Storage) Storage { return &fanout{ - storages: storages, + primary: primary, + secondaries: secondaries, } } func (f *fanout) Querier(mint, maxt int64) (Querier, error) { queriers := mergeQuerier{ - queriers: make([]Querier, 0, len(f.storages)), + queriers: make([]Querier, 0, 1+len(f.secondaries)), } - for _, storage := range f.storages { + + // Add primary querier + querier, err := f.primary.Querier(mint, maxt) + if err != nil { + return nil, err + } + queriers.queriers = append(queriers.queriers, querier) + + // Add secondary queriers + for _, storage := range f.secondaries { querier, err := storage.Querier(mint, maxt) if err != nil { queriers.Close() @@ -48,24 +60,34 @@ func (f *fanout) Querier(mint, maxt int64) (Querier, error) { } func (f *fanout) Appender() (Appender, error) { - appenders := make([]Appender, 0, len(f.storages)) - for _, storage := range f.storages { + primary, err := f.primary.Appender() + if err != nil { + return nil, err + } + + secondaries := make([]Appender, 0, len(f.secondaries)) + for _, storage := range f.secondaries { appender, err := storage.Appender() if err != nil { return nil, err } - appenders = append(appenders, appender) + secondaries = append(secondaries, appender) } return &fanoutAppender{ - appenders: appenders, + primary: primary, + secondaries: secondaries, }, nil } // Close closes the storage and all its underlying resources. func (f *fanout) Close() error { + if err := f.primary.Close(); err != nil { + return err + } + // TODO return multiple errors? var lastErr error - for _, storage := range f.storages { + for _, storage := range f.secondaries { if err := storage.Close(); err != nil { lastErr = err } @@ -75,40 +97,61 @@ func (f *fanout) Close() error { // fanoutAppender implements Appender. type fanoutAppender struct { - appenders []Appender + primary Appender + secondaries []Appender } func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) { - for _, appender := range f.appenders { + ref, err := f.primary.Add(l, t, v) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { if _, err := appender.Add(l, t, v); err != nil { return "", err } } - return "", nil + return ref, nil } func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error { - for _, appender := range f.appenders { - if err := appender.AddFast(l, ref, t, v); err != nil { + if err := f.primary.AddFast(l, ref, t, v); err != nil { + return err + } + + for _, appender := range f.secondaries { + if _, err := appender.Add(l, t, v); err != nil { return err } } return nil } -func (f *fanoutAppender) Commit() error { - for _, appender := range f.appenders { - if err := appender.Commit(); err != nil { - return err +func (f *fanoutAppender) Commit() (err error) { + err = f.primary.Commit() + + for _, appender := range f.secondaries { + if err == nil { + err = appender.Commit() + } else { + if rollbackErr := appender.Rollback(); rollbackErr != nil { + log.Errorf("Squashed rollback error on commit: %v", rollbackErr) + } } } - return nil + return } -func (f *fanoutAppender) Rollback() error { - for _, appender := range f.appenders { - if err := appender.Rollback(); err != nil { - return err +func (f *fanoutAppender) Rollback() (err error) { + err = f.primary.Rollback() + + for _, appender := range f.secondaries { + rollbackErr := appender.Rollback() + if err == nil { + err = rollbackErr + } else if rollbackErr != nil { + log.Errorf("Squashed rollback error on rollback: %v", rollbackErr) } } return nil