Merge pull request #14496 from bboreham/fix-nil-primary (#14509)

[BUGFIX] Storage: errors from a single secondary querier should be warnings.

This is a backport of #14496 to release-2.54 branch.

#13434 introduced an unwanted change in behaviour: if there was no primary querier and a single secondary querier, the secondary would be treated like a primary.  This PR restores the previous behaviour, that all secondary queriers report errors as warnings.

In order to test this behaviour, I changed `TestMergeQuerierWithSecondaries_ErrorHandling` so it now calls `NewMergeQuerier` rather than creating the internal data structure directly. 

This in turn required all the data types to change, so I merged  `mockGenericQuerier` into `mockQuerier`.
Also replaced `unwrapMockGenericQuerier` with a visitor pattern.

While I was there, I addressed the comment from https://github.com/prometheus/prometheus/pull/13434#pullrequestreview-2191058921 to short-circuit the merge of single querier with any number of no-op or nil queriers.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-07-29 14:41:10 +01:00 committed by GitHub
parent 7b5897a46d
commit d186caead5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 204 additions and 164 deletions

View file

@ -45,25 +45,24 @@ type mergeGenericQuerier struct {
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
primaries = filterQueriers(primaries)
secondaries = filterQueriers(secondaries)
switch {
case len(primaries)+len(secondaries) == 0:
case len(primaries) == 0 && len(secondaries) == 0:
return noopQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return secondaries[0]
return &querierAdapter{newSecondaryQuerierFrom(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q))
}
queriers = append(queriers, newGenericQuerierFrom(q))
}
for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
concurrentSelect := false
@ -77,31 +76,40 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
}}
}
func filterQueriers(qs []Querier) []Querier {
ret := make([]Querier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
primaries = filterChunkQueriers(primaries)
secondaries = filterChunkQueriers(secondaries)
switch {
case len(primaries) == 0 && len(secondaries) == 0:
return noopChunkQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return secondaries[0]
return &chunkQuerierAdapter{newSecondaryQuerierFromChunk(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
for _, q := range secondaries {
queriers = append(queriers, newSecondaryQuerierFromChunk(q))
}
concurrentSelect := false
@ -115,6 +123,16 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
}}
}
func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
ret := make([]ChunkQuerier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))

View file

@ -912,9 +912,23 @@ func TestConcatenatingChunkIterator(t *testing.T) {
}
type mockQuerier struct {
LabelQuerier
mtx sync.Mutex
toReturn []Series
toReturn []Series // Response for Select.
closed bool
labelNamesCalls int
labelNamesRequested []labelNameRequest
sortedSeriesRequested []bool
resp []string // Response for LabelNames and LabelValues; turned into Select response if toReturn is not supplied.
warnings annotations.Annotations
err error
}
type labelNameRequest struct {
name string
matchers []*labels.Matcher
}
type seriesByLabel []Series
@ -924,13 +938,47 @@ func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
cpy := make([]Series, len(m.toReturn))
copy(cpy, m.toReturn)
m.mtx.Lock()
defer m.mtx.Unlock()
m.sortedSeriesRequested = append(m.sortedSeriesRequested, sortSeries)
var ret []Series
if len(m.toReturn) > 0 {
ret = make([]Series, len(m.toReturn))
copy(ret, m.toReturn)
} else if len(m.resp) > 0 {
ret = make([]Series, 0, len(m.resp))
for _, l := range m.resp {
ret = append(ret, NewListSeries(labels.FromStrings("test", l), nil))
}
}
if sortSeries {
sort.Sort(seriesByLabel(cpy))
sort.Sort(seriesByLabel(ret))
}
return NewMockSeriesSet(cpy...)
return &mockSeriesSet{idx: -1, series: ret, warnings: m.warnings, err: m.err}
}
func (m *mockQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesCalls++
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockQuerier) Close() error {
m.closed = true
return nil
}
type mockChunkQuerier struct {
@ -960,6 +1008,9 @@ func (m *mockChunkQuerier) Select(_ context.Context, sortSeries bool, _ *SelectH
type mockSeriesSet struct {
idx int
series []Series
warnings annotations.Annotations
err error
}
func NewMockSeriesSet(series ...Series) SeriesSet {
@ -970,15 +1021,18 @@ func NewMockSeriesSet(series ...Series) SeriesSet {
}
func (m *mockSeriesSet) Next() bool {
if m.err != nil {
return false
}
m.idx++
return m.idx < len(m.series)
}
func (m *mockSeriesSet) At() Series { return m.series[m.idx] }
func (m *mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Err() error { return m.err }
func (m *mockSeriesSet) Warnings() annotations.Annotations { return nil }
func (m *mockSeriesSet) Warnings() annotations.Annotations { return m.warnings }
type mockChunkSeriesSet struct {
idx int
@ -1336,105 +1390,44 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
}
}
type mockGenericQuerier struct {
mtx sync.Mutex
closed bool
labelNamesCalls int
labelNamesRequested []labelNameRequest
sortedSeriesRequested []bool
resp []string
warnings annotations.Annotations
err error
}
type labelNameRequest struct {
name string
matchers []*labels.Matcher
}
func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
m.mtx.Lock()
m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
m.mtx.Unlock()
return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
}
func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
m.mtx.Lock()
m.labelNamesCalls++
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) Close() error {
m.closed = true
return nil
}
type mockGenericSeriesSet struct {
resp []string
warnings annotations.Annotations
err error
curr int
}
func (m *mockGenericSeriesSet) Next() bool {
if m.err != nil {
return false
func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int {
count := 0
switch x := qr.(type) {
case *mockQuerier:
count++
f(t, x)
case *querierAdapter:
count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f)
}
if m.curr >= len(m.resp) {
return false
return count
}
func visitMockQueriersInGenericQuerier(t *testing.T, g genericQuerier, f func(t *testing.T, q *mockQuerier)) int {
count := 0
switch x := g.(type) {
case *mergeGenericQuerier:
for _, q := range x.queriers {
count += visitMockQueriersInGenericQuerier(t, q, f)
}
case *genericQuerierAdapter:
// Visitor for chunkQuerier not implemented.
count += visitMockQueriers(t, x.q, f)
case *secondaryQuerier:
count += visitMockQueriersInGenericQuerier(t, x.genericQuerier, f)
}
m.curr++
return true
return count
}
func (m *mockGenericSeriesSet) Err() error { return m.err }
func (m *mockGenericSeriesSet) Warnings() annotations.Annotations { return m.warnings }
func (m *mockGenericSeriesSet) At() Labels {
return mockLabels(m.resp[m.curr-1])
}
type mockLabels string
func (l mockLabels) Labels() labels.Labels {
return labels.FromStrings("test", string(l))
}
func unwrapMockGenericQuerier(t *testing.T, qr genericQuerier) *mockGenericQuerier {
m, ok := qr.(*mockGenericQuerier)
if !ok {
s, ok := qr.(*secondaryQuerier)
require.True(t, ok, "expected secondaryQuerier got something else")
m, ok = s.genericQuerier.(*mockGenericQuerier)
require.True(t, ok, "expected mockGenericQuerier got something else")
}
return m
}
func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
var (
errStorage = errors.New("storage error")
warnStorage = errors.New("storage warning")
ctx = context.Background()
)
for _, tcase := range []struct {
name string
queriers []genericQuerier
name string
primaries []Querier
secondaries []Querier
expectedSelectsSeries []labels.Labels
expectedLabels []string
@ -1443,10 +1436,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedErrs [4]error
}{
{
// NewMergeQuerier will not create a mergeGenericQuerier
// with just one querier inside, but we can test it anyway.
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
name: "one successful primary querier",
primaries: []Querier{&mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
@ -1455,9 +1446,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "multiple successful primary queriers",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&mockGenericQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
primaries: []Querier{
&mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&mockQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@ -1468,15 +1459,17 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
primaries: []Querier{&mockQuerier{warnings: nil, err: errStorage}},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
primaries: []Querier{
&mockQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
&mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@ -1487,10 +1480,12 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one successful primary querier with empty response and successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
primaries: []Querier{
&mockQuerier{resp: []string{}, warnings: nil, err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
&mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "b"),
@ -1500,19 +1495,42 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "one failed primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{warnings: nil, err: errStorage},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
primaries: []Querier{
&mockQuerier{warnings: nil, err: errStorage},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: nil},
&mockQuerier{resp: []string{"c"}, warnings: nil, err: nil},
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "nil primary querier with failed secondary",
primaries: nil,
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
},
expectedLabels: []string{},
expectedWarnings: annotations.New().Add(errStorage),
},
{
name: "nil primary querier with two failed secondaries",
primaries: nil,
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
&mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage},
},
expectedLabels: []string{},
expectedWarnings: annotations.New().Add(errStorage),
},
{
name: "one successful primary querier with failed secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}},
primaries: []Querier{
&mockQuerier{resp: []string{"a"}, warnings: nil, err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: nil, err: errStorage},
&mockQuerier{resp: []string{"c"}, warnings: nil, err: errStorage},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@ -1522,9 +1540,11 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
{
name: "successful queriers with warnings",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil}},
primaries: []Querier{
&mockQuerier{resp: []string{"a"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
@ -1535,10 +1555,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
},
} {
t.Run(tcase.name, func(t *testing.T) {
q := &mergeGenericQuerier{
queriers: tcase.queriers,
mergeFn: func(l ...Labels) Labels { return l[0] },
}
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
t.Run("Select", func(t *testing.T) {
res := q.Select(context.Background(), false, nil)
@ -1551,65 +1568,70 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.ErrorIs(t, res.Err(), tcase.expectedErrs[0], "expected error doesn't match")
require.Equal(t, tcase.expectedSelectsSeries, lbls)
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
// mergeGenericQuerier forces all Selects to be sorted.
require.Equal(t, []bool{true}, m.sortedSeriesRequested)
}
n := visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
// Single queries should be unsorted; merged queries sorted.
exp := len(tcase.primaries)+len(tcase.secondaries) > 1
require.Equal(t, []bool{exp}, m.sortedSeriesRequested)
})
// Check we visited all queriers.
require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n)
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames(ctx, nil)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)
requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, 1, m.labelNamesCalls)
}
})
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test", nil)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)
requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
}
})
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues(ctx, "test2", nil, matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)
requireEqualSlice(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
visitMockQueriers(t, q, func(t *testing.T, m *mockQuerier) {
require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
}
})
})
})
}
}
// Check slice but ignore difference between nil and empty.
func requireEqualSlice[T any](t require.TestingT, a, b []T, msgAndArgs ...interface{}) {
if len(a) == 0 {
require.Empty(t, b, msgAndArgs...)
} else {
require.Equal(t, a, b, msgAndArgs...)
}
}
type errIterator struct {
err error
}