Fixed incorrect query results caused by buffer reuse in merge adapter (#7361)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2020-06-08 20:55:53 +02:00 committed by GitHub
parent f674ccfdfc
commit 1e006d84a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 12 deletions

View file

@ -195,10 +195,18 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
} }
qs = append(qs, tc.extraQueriers...) qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeQuerier(qs[0], qs, ChainedSeriesMerge).Select(false, nil) mergedQuerier, _, _ := NewMergeQuerier(qs[0], qs, ChainedSeriesMerge).Select(false, nil)
for merged.Next() {
// Get all merged series upfront to make sure there are no incorrectly retained shared
// buffers causing bugs.
var mergedSeries []Series
for mergedQuerier.Next() {
mergedSeries = append(mergedSeries, mergedQuerier.At())
}
testutil.Ok(t, mergedQuerier.Err())
for _, actualSeries := range mergedSeries {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At() expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels()) testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
@ -207,7 +215,6 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
testutil.Equals(t, expErr, actErr) testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expSmpl, actSmpl) testutil.Equals(t, expSmpl, actSmpl)
} }
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false") testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
}) })
} }

View file

@ -108,26 +108,24 @@ func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matche
type seriesMergerAdapter struct { type seriesMergerAdapter struct {
VerticalSeriesMergeFunc VerticalSeriesMergeFunc
buf []Series
} }
func (a *seriesMergerAdapter) Merge(s ...Labels) Labels { func (a *seriesMergerAdapter) Merge(s ...Labels) Labels {
a.buf = a.buf[:0] buf := make([]Series, 0, len(s))
for _, ser := range s { for _, ser := range s {
a.buf = append(a.buf, ser.(Series)) buf = append(buf, ser.(Series))
} }
return a.VerticalSeriesMergeFunc(a.buf...) return a.VerticalSeriesMergeFunc(buf...)
} }
type chunkSeriesMergerAdapter struct { type chunkSeriesMergerAdapter struct {
VerticalChunkSeriesMergerFunc VerticalChunkSeriesMergerFunc
buf []ChunkSeries
} }
func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
a.buf = a.buf[:0] buf := make([]ChunkSeries, 0, len(s))
for _, ser := range s { for _, ser := range s {
a.buf = append(a.buf, ser.(ChunkSeries)) buf = append(buf, ser.(ChunkSeries))
} }
return a.VerticalChunkSeriesMergerFunc(a.buf...) return a.VerticalChunkSeriesMergerFunc(buf...)
} }